Skip to content

Commit

Permalink
[awss3receiver] Add metrics and logs support (#33558)
Browse files Browse the repository at this point in the history
**Description:** Add support for receiving Logs and Metrics using the
AWS S3 Receiver

**Link to tracking Issue:** #30750

**Testing:** New unit tests added for Logs and Metrics
  • Loading branch information
adcharre authored Jul 5, 2024
1 parent 59170a3 commit f8b264c
Show file tree
Hide file tree
Showing 10 changed files with 503 additions and 46 deletions.
27 changes: 27 additions & 0 deletions .chloggen/awss3receiver_metrics_logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for retrieving logs and metrics to the AWS S3 Receiver.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30750]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion receiver/awss3receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: traces |
| Stability | [development]: traces, metrics, logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fawss3%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fawss3) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fawss3%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fawss3) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme), [@adcharre](https://www.github.com/adcharre) |
Expand Down
10 changes: 10 additions & 0 deletions receiver/awss3receiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,19 @@ func NewFactory() receiver.Factory {
metadata.Type,
createDefaultConfig,
receiver.WithTraces(createTracesReceiver, metadata.TracesStability),
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
receiver.WithLogs(createLogsReceiver, metadata.LogsStability),
)
}

func createTracesReceiver(ctx context.Context, settings receiver.Settings, cc component.Config, consumer consumer.Traces) (receiver.Traces, error) {
return newAWSS3TraceReceiver(ctx, cc.(*Config), consumer, settings)
}

func createMetricsReceiver(ctx context.Context, settings receiver.Settings, cc component.Config, consumer consumer.Metrics) (receiver.Metrics, error) {
return newAWSS3MetricsReceiver(ctx, cc.(*Config), consumer, settings)
}

func createLogsReceiver(ctx context.Context, settings receiver.Settings, cc component.Config, consumer consumer.Logs) (receiver.Logs, error) {
return newAWSS3LogsReceiver(ctx, cc.(*Config), consumer, settings)
}
14 changes: 14 additions & 0 deletions receiver/awss3receiver/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions receiver/awss3receiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/aws/smithy-go v1.20.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
Expand All @@ -55,7 +55,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.54.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions receiver/awss3receiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion receiver/awss3receiver/internal/metadata/generated_status.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion receiver/awss3receiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ type: awss3
status:
class: receiver
stability:
development: [traces]
development: [traces,metrics,logs]
distributions: []
codeowners:
active: [atoulme, adcharre]
Expand Down
155 changes: 130 additions & 25 deletions receiver/awss3receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,37 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
)

type encodingExtension struct {
extension ptrace.Unmarshaler
extension component.Component
suffix string
}

type encodingExtensions []encodingExtension

type awss3TraceReceiver struct {
type receiverProcessor interface {
processReceivedData(ctx context.Context, receiver *awss3Receiver, key string, data []byte) error
}

type awss3Receiver struct {
s3Reader *s3Reader
consumer consumer.Traces
logger *zap.Logger
cancel context.CancelFunc
obsrecv *receiverhelper.ObsReport
encodingsConfig []Encoding
telemetryType string
dataProcessor receiverProcessor
extensions encodingExtensions
}

func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Traces, settings receiver.Settings) (*awss3TraceReceiver, error) {
func newAWSS3Receiver(ctx context.Context, cfg *Config, telemetryType string, settings receiver.Settings, processor receiverProcessor) (*awss3Receiver, error) {
reader, err := newS3Reader(ctx, cfg)
if err != nil {
return nil, err
Expand All @@ -50,17 +57,18 @@ func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Tra
return nil, err
}

return &awss3TraceReceiver{
return &awss3Receiver{
s3Reader: reader,
consumer: traces,
telemetryType: telemetryType,
logger: settings.Logger,
cancel: nil,
obsrecv: obsrecv,
dataProcessor: processor,
encodingsConfig: cfg.Encodings,
}, nil
}

func (r *awss3TraceReceiver) Start(_ context.Context, host component.Host) error {
func (r *awss3Receiver) Start(_ context.Context, host component.Host) error {
var err error
r.extensions, err = newEncodingExtensions(r.encodingsConfig, host)
if err != nil {
Expand All @@ -70,23 +78,22 @@ func (r *awss3TraceReceiver) Start(_ context.Context, host component.Host) error
var ctx context.Context
ctx, r.cancel = context.WithCancel(context.Background())
go func() {
_ = r.s3Reader.readAll(ctx, "traces", r.receiveBytes)
_ = r.s3Reader.readAll(ctx, r.telemetryType, r.receiveBytes)
}()
return nil
}

func (r *awss3TraceReceiver) Shutdown(_ context.Context) error {
func (r *awss3Receiver) Shutdown(_ context.Context) error {
if r.cancel != nil {
r.cancel()
}
return nil
}

func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data []byte) error {
func (r *awss3Receiver) receiveBytes(ctx context.Context, key string, data []byte) error {
if data == nil {
return nil
}

if strings.HasSuffix(key, ".gz") {
reader, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
Expand All @@ -98,8 +105,26 @@ func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data
return err
}
}
return r.dataProcessor.processReceivedData(ctx, r, key, data)
}

type traceReceiver struct {
consumer consumer.Traces
}

func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Traces, settings receiver.Settings) (*awss3Receiver, error) {
return newAWSS3Receiver(ctx, cfg, "traces", settings, &traceReceiver{consumer: traces})
}

func (r *traceReceiver) processReceivedData(ctx context.Context, rcvr *awss3Receiver, key string, data []byte) error {
var unmarshaler ptrace.Unmarshaler
var format string

if extension, f := rcvr.extensions.findExtension(key); extension != nil {
unmarshaler, _ = extension.(ptrace.Unmarshaler)
format = f
}

unmarshaler, format := r.extensions.findExtension(key)
if unmarshaler == nil {
if strings.HasSuffix(key, ".json") {
unmarshaler = &ptrace.JSONUnmarshaler{}
Expand All @@ -111,38 +136,118 @@ func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data
}
}
if unmarshaler == nil {
r.logger.Warn("Unsupported file format", zap.String("key", key))
rcvr.logger.Warn("Unsupported file format", zap.String("key", key))
return nil
}
traces, err := unmarshaler.UnmarshalTraces(data)
if err != nil {
return err
}
obsCtx := r.obsrecv.StartTracesOp(ctx)
obsCtx := rcvr.obsrecv.StartTracesOp(ctx)
err = r.consumer.ConsumeTraces(ctx, traces)
r.obsrecv.EndTracesOp(obsCtx, format, traces.SpanCount(), err)
rcvr.obsrecv.EndTracesOp(obsCtx, format, traces.SpanCount(), err)
return err
}

type metricsReceiver struct {
consumer consumer.Metrics
}

func newAWSS3MetricsReceiver(ctx context.Context, cfg *Config, metrics consumer.Metrics, settings receiver.Settings) (*awss3Receiver, error) {
return newAWSS3Receiver(ctx, cfg, "metrics", settings, &metricsReceiver{consumer: metrics})
}

func (r *metricsReceiver) processReceivedData(ctx context.Context, rcvr *awss3Receiver, key string, data []byte) error {
var unmarshaler pmetric.Unmarshaler
var format string

if extension, f := rcvr.extensions.findExtension(key); extension != nil {
unmarshaler, _ = extension.(pmetric.Unmarshaler)
format = f
}

if unmarshaler == nil {
if strings.HasSuffix(key, ".json") {
unmarshaler = &pmetric.JSONUnmarshaler{}
format = "otlp_json"
}
if strings.HasSuffix(key, ".binpb") {
unmarshaler = &pmetric.ProtoUnmarshaler{}
format = "otlp_proto"
}
}
if unmarshaler == nil {
rcvr.logger.Warn("Unsupported file format", zap.String("key", key))
return nil
}
metrics, err := unmarshaler.UnmarshalMetrics(data)
if err != nil {
return err
}
obsCtx := rcvr.obsrecv.StartMetricsOp(ctx)
err = r.consumer.ConsumeMetrics(ctx, metrics)
rcvr.obsrecv.EndMetricsOp(obsCtx, format, metrics.MetricCount(), err)
return err
}

type logsReceiver struct {
consumer consumer.Logs
}

func newAWSS3LogsReceiver(ctx context.Context, cfg *Config, logs consumer.Logs, settings receiver.Settings) (*awss3Receiver, error) {
return newAWSS3Receiver(ctx, cfg, "logs", settings, &logsReceiver{consumer: logs})
}

func (r *logsReceiver) processReceivedData(ctx context.Context, rcvr *awss3Receiver, key string, data []byte) error {
var unmarshaler plog.Unmarshaler
var format string

if extension, f := rcvr.extensions.findExtension(key); extension != nil {
unmarshaler, _ = extension.(plog.Unmarshaler)
format = f
}

if unmarshaler == nil {
if strings.HasSuffix(key, ".json") {
unmarshaler = &plog.JSONUnmarshaler{}
format = "otlp_json"
}
if strings.HasSuffix(key, ".binpb") {
unmarshaler = &plog.ProtoUnmarshaler{}
format = "otlp_proto"
}
}
if unmarshaler == nil {
rcvr.logger.Warn("Unsupported file format", zap.String("key", key))
return nil
}
logs, err := unmarshaler.UnmarshalLogs(data)
if err != nil {
return err
}
obsCtx := rcvr.obsrecv.StartLogsOp(ctx)
err = r.consumer.ConsumeLogs(ctx, logs)
rcvr.obsrecv.EndLogsOp(obsCtx, format, logs.LogRecordCount(), err)
return err
}

func newEncodingExtensions(encodingsConfig []Encoding, host component.Host) (encodingExtensions, error) {
encodings := make(encodingExtensions, 0)
extensions := host.GetExtensions()
for _, encoding := range encodingsConfig {
if e, ok := extensions[encoding.Extension]; ok {
if u, ok := e.(ptrace.Unmarshaler); ok {
encodings = append(encodings, encodingExtension{extension: u, suffix: encoding.Suffix})
}
for _, configItem := range encodingsConfig {
if e, ok := extensions[configItem.Extension]; ok {
encodings = append(encodings, encodingExtension{extension: e, suffix: configItem.Suffix})
} else {
return nil, fmt.Errorf("extension %q not found", encoding.Extension)
return nil, fmt.Errorf("extension %q not found", configItem.Extension)
}
}
return encodings, nil
}

func (encodings encodingExtensions) findExtension(key string) (ptrace.Unmarshaler, string) {
for _, encoding := range encodings {
if strings.HasSuffix(key, encoding.suffix) {
return encoding.extension, encoding.suffix
func (encodings encodingExtensions) findExtension(key string) (component.Component, string) {
for _, e := range encodings {
if strings.HasSuffix(key, e.suffix) {
return e.extension, e.suffix
}
}
return nil, ""
Expand Down
Loading

0 comments on commit f8b264c

Please sign in to comment.