Skip to content

Commit

Permalink
refactor: consistent err handling for OpenTelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
Chief-Rishab authored and ravisuhag committed Aug 23, 2023
1 parent 38c01c9 commit 8f3a769
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 101 deletions.
4 changes: 2 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str
return fmt.Errorf("find processor %q: %w", pr.Name, err)
}

proc, err = otelmw.WithProcessorMW(proc, pr.Name, recipeName)
proc = otelmw.WithProcessor(pr.Name, recipeName)(proc)
if err != nil {
return fmt.Errorf("wrap processor %q: %w", pr.Name, err)
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
return fmt.Errorf("find sink %q: %w", sr.Name, err)
}

sink, err = otelmw.WithSinkMW(sink, sr.Name, recipeName)
sink = otelmw.WithSink(sr.Name, recipeName)(sink)
if err != nil {
return fmt.Errorf("wrap otel sink %q: %w", sr.Name, err)
}
Expand Down
9 changes: 1 addition & 8 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,7 @@ func RunCmd() *cobra.Command {
}
defer doneOtlp()

mt, err := metrics.NewOtelMonitor()
if err != nil {
return err
}

if mt != nil {
mts = append(mts, mt)
}
mts = append(mts, metrics.NewOtelMonitor())
}

runner := agent.NewAgent(agent.Config{
Expand Down
6 changes: 2 additions & 4 deletions metrics/opentelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ func TestOtelMonitor_RecordRun(t *testing.T) {
defer done()
assert.Nil(t, err)

monitor, err := metrics.NewOtelMonitor()
monitor := metrics.NewOtelMonitor()

monitor.RecordRun(ctx, agent.Run{Recipe: recipe, DurationInMs: duration, RecordCount: recordCount, Success: false})

assert.Nil(t, err)
assert.NotNil(t, monitor)
assert.NotNil(t, done)
})
Expand All @@ -62,7 +61,7 @@ func TestOtelMonitor_RecordPlugin(t *testing.T) {
defer done()
assert.Nil(t, err)

monitor, err := metrics.NewOtelMonitor()
monitor := metrics.NewOtelMonitor()

monitor.RecordPlugin(context.Background(),
agent.PluginInfo{
Expand All @@ -71,7 +70,6 @@ func TestOtelMonitor_RecordPlugin(t *testing.T) {
PluginType: "sink",
Success: true,
})
assert.Nil(t, err)
assert.NotNil(t, monitor)
assert.NotNil(t, done)
})
Expand Down
34 changes: 16 additions & 18 deletions metrics/otel_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,39 @@ import (

// OtelMonitor represents the otel monitor.
type OtelMonitor struct {
recipeDuration metric.Int64Histogram
recipeDuration metric.Float64Histogram
extractorRetries metric.Int64Counter
assetsExtracted metric.Int64Counter
sinkRetries metric.Int64Counter
}

func NewOtelMonitor() (*OtelMonitor, error) {
func NewOtelMonitor() *OtelMonitor {
// init meters
meter := otel.Meter("")
recipeDuration, err := meter.Int64Histogram("meteor.recipe.duration", metric.WithUnit("ms"))
if err != nil {
return nil, err
}
meter := otel.Meter("github.com/raystack/meteor/metrics")
recipeDuration, err := meter.Float64Histogram("meteor.recipe.duration", metric.WithUnit("s"))
handleOtelErr(err)

extractorRetries, err := meter.Int64Counter("meteor.extractor.retries")
if err != nil {
return nil, err
}
handleOtelErr(err)

assetsExtracted, err := meter.Int64Counter("meteor.assets.extracted")
if err != nil {
return nil, err
}
handleOtelErr(err)

sinkRetries, err := meter.Int64Counter("meteor.sink.retries")
if err != nil {
return nil, err
}
handleOtelErr(err)

return &OtelMonitor{
recipeDuration: recipeDuration,
extractorRetries: extractorRetries,
assetsExtracted: assetsExtracted,
sinkRetries: sinkRetries,
}, nil
}
}

// RecordRun records a run behavior
func (m *OtelMonitor) RecordRun(ctx context.Context, run agent.Run) {
m.recipeDuration.Record(ctx,
int64(run.DurationInMs),
float64(run.DurationInMs)/1000.0,
metric.WithAttributes(
attribute.String("recipe_name", run.Recipe.Name),
attribute.String("extractor", run.Recipe.Source.Name),
Expand Down Expand Up @@ -89,3 +81,9 @@ func (m *OtelMonitor) RecordSinkRetryCount(ctx context.Context, pluginInfo agent
attribute.Int64("batch_size", int64(pluginInfo.BatchSize)),
))
}

func handleOtelErr(err error) {
if err != nil {
otel.Handle(err)
}
}
33 changes: 17 additions & 16 deletions metrics/otelmw/processorsmw.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,43 @@ import (
"go.opentelemetry.io/otel/metric"
)

type ProcessorMW struct {
type Processor struct {
next plugins.Processor
duration metric.Int64Histogram
pluginName string
recipeName string
}

func WithProcessorMW(p plugins.Processor, pluginName, recipeName string) (plugins.Processor, error) {
meter := otel.Meter("")

processorDuration, err := meter.Int64Histogram("meteor.processor.duration", metric.WithUnit("ms"))
func WithProcessor(pluginName, recipeName string) func(plugins.Processor) plugins.Processor {
processorDuration, err := otel.Meter("github.com/raystack/meteor/metrics/otelmw").
Int64Histogram("meteor.processor.duration", metric.WithUnit("ms"))
if err != nil {
return nil, err
otel.Handle(err)
}

return &ProcessorMW{
next: p,
duration: processorDuration,
pluginName: pluginName,
recipeName: recipeName,
}, nil
return func(p plugins.Processor) plugins.Processor {
return &Processor{
next: p,
duration: processorDuration,
pluginName: pluginName,
recipeName: recipeName,
}
}
}

func (mw *ProcessorMW) Init(ctx context.Context, cfg plugins.Config) error {
func (mw *Processor) Init(ctx context.Context, cfg plugins.Config) error {
return mw.next.Init(ctx, cfg)
}

func (mw *ProcessorMW) Info() plugins.Info {
func (mw *Processor) Info() plugins.Info {
return mw.next.Info()
}

func (mw *ProcessorMW) Validate(cfg plugins.Config) error {
func (mw *Processor) Validate(cfg plugins.Config) error {
return mw.next.Validate(cfg)
}

func (mw *ProcessorMW) Process(ctx context.Context, src models.Record) (dst models.Record, err error) {
func (mw *Processor) Process(ctx context.Context, src models.Record) (dst models.Record, err error) {
defer func(start time.Time) {
mw.duration.Record(ctx,
time.Since(start).Milliseconds(),
Expand Down
35 changes: 18 additions & 17 deletions metrics/otelmw/sinksmw.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,47 @@ import (
"go.opentelemetry.io/otel/metric"
)

type SinksMW struct {
type Sinks struct {
next plugins.Syncer
duration metric.Int64Histogram
pluginName string
recipeName string
}

func WithSinkMW(s plugins.Syncer, pluginName, recipeName string) (plugins.Syncer, error) {
meter := otel.Meter("")

sinkDuration, err := meter.Int64Histogram("meteor.sink.duration", metric.WithUnit("ms"))
func WithSink(pluginName, recipeName string) func(plugins.Syncer) plugins.Syncer {
sinkDuration, err := otel.Meter("github.com/raystack/meteor/metrics/otelmw").
Int64Histogram("meteor.sink.duration", metric.WithUnit("ms"))
if err != nil {
return nil, err
otel.Handle(err)
}

return &SinksMW{
next: s,
duration: sinkDuration,
pluginName: pluginName,
recipeName: recipeName,
}, nil
return func(s plugins.Syncer) plugins.Syncer {
return &Sinks{
next: s,
duration: sinkDuration,
pluginName: pluginName,
recipeName: recipeName,
}
}
}

func (mw *SinksMW) Init(ctx context.Context, cfg plugins.Config) error {
func (mw *Sinks) Init(ctx context.Context, cfg plugins.Config) error {
return mw.next.Init(ctx, cfg)
}

func (mw *SinksMW) Info() plugins.Info {
func (mw *Sinks) Info() plugins.Info {
return mw.next.Info()
}

func (mw *SinksMW) Validate(cfg plugins.Config) error {
func (mw *Sinks) Validate(cfg plugins.Config) error {
return mw.next.Validate(cfg)
}

func (mw *SinksMW) Close() error {
func (mw *Sinks) Close() error {
return mw.next.Close()
}

func (mw *SinksMW) Sink(ctx context.Context, batch []models.Record) (err error) {
func (mw *Sinks) Sink(ctx context.Context, batch []models.Record) (err error) {
defer func(start time.Time) {
mw.duration.Record(ctx,
time.Since(start).Milliseconds(),
Expand Down
Loading

0 comments on commit 8f3a769

Please sign in to comment.