Skip to content
This repository was archived by the owner on Dec 11, 2024. It is now read-only.

Commit f57293a

Browse files
authored
Merge pull request #6 from usedatabrew/feat/DAT-221/processor-metrics
feat(DAT-221/metrics): added foundation for processor metrics
2 parents 90b66ef + 89fa9bd commit f57293a

File tree

5 files changed

+167
-10
lines changed

5 files changed

+167
-10
lines changed

internal/metrics/influx/plugin.go

Lines changed: 90 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ type Plugin struct {
1515
sinkErrorsCounter metrics.Counter
1616
sourceErrorsCounter metrics.Counter
1717

18+
procMetrics map[string][]metrics.Counter
19+
procExecutionTimeMetrics map[string]metrics.Gauge
20+
1821
client *influxdb3.Client
1922
writeOptions influxdb3.WriteOptions
2023

@@ -26,14 +29,16 @@ type Plugin struct {
2629

2730
func NewPlugin(config Config) (*Plugin, error) {
2831
plugin := &Plugin{
29-
groupName: config.GroupName,
30-
pipelineId: config.PipelineId,
31-
orgId: config.Org,
32-
bucket: config.Bucket,
33-
sentCounter: metrics.NewCounter(),
34-
receivedCounter: metrics.NewCounter(),
35-
sinkErrorsCounter: metrics.NewCounter(),
36-
sourceErrorsCounter: metrics.NewCounter(),
32+
groupName: config.GroupName,
33+
pipelineId: config.PipelineId,
34+
orgId: config.Org,
35+
bucket: config.Bucket,
36+
sentCounter: metrics.NewCounter(),
37+
receivedCounter: metrics.NewCounter(),
38+
sinkErrorsCounter: metrics.NewCounter(),
39+
sourceErrorsCounter: metrics.NewCounter(),
40+
procMetrics: map[string][]metrics.Counter{},
41+
procExecutionTimeMetrics: map[string]metrics.Gauge{},
3742
}
3843
plugin.receivedCounter.Clear()
3944
plugin.sentCounter.Clear()
@@ -79,9 +84,86 @@ func (p *Plugin) IncrementSourceErrCounter() {
7984
p.sourceErrorsCounter.Inc(1)
8085
}
8186

87+
func (p *Plugin) SetProcessorExecutionTime(proc string, time int64) {
88+
p.procExecutionTimeMetrics[proc].Update(time)
89+
}
90+
91+
func (p *Plugin) IncrementProcessorDroppedMessages(proc string) {
92+
p.procMetrics[proc][0].Inc(0)
93+
}
94+
95+
func (p *Plugin) IncrementProcessorReceivedMessages(proc string) {
96+
p.procMetrics[proc][0].Inc(1)
97+
}
98+
99+
func (p *Plugin) IncrementProcessorSentMessages(proc string) {
100+
p.procMetrics[proc][0].Inc(2)
101+
}
102+
103+
func (p *Plugin) RegisterProcessors(processors []string) {
104+
for _, proc := range processors {
105+
p.procExecutionTimeMetrics[proc] = metrics.NewGauge()
106+
p.procMetrics[proc] = []metrics.Counter{
107+
// dropped/filtered messages metrics
108+
metrics.NewCounter(),
109+
// sent messages metrics
110+
metrics.NewCounter(),
111+
// received metrics
112+
metrics.NewCounter(),
113+
}
114+
}
115+
}
116+
82117
func (p *Plugin) flushMetrics() {
83118
t := time.Now()
84119

120+
for proc, counters := range p.procMetrics {
121+
122+
executionTimePoint := influxdb3.NewPointWithMeasurement("blink_data").
123+
SetTag("group", p.groupName).
124+
SetTag("pipeline", strconv.Itoa(p.pipelineId)).
125+
SetTag("processor", proc).
126+
SetField("execution_time", p.procExecutionTimeMetrics).
127+
SetTimestamp(t)
128+
129+
if err := p.client.WritePointsWithOptions(context.Background(), &p.writeOptions, executionTimePoint); err != nil {
130+
panic(err)
131+
}
132+
133+
filteredMessagesPoint := influxdb3.NewPointWithMeasurement("blink_data").
134+
SetTag("group", p.groupName).
135+
SetTag("pipeline", strconv.Itoa(p.pipelineId)).
136+
SetTag("processor", proc).
137+
SetField("dropped_messages", counters[0].Count()).
138+
SetTimestamp(t)
139+
140+
if err := p.client.WritePointsWithOptions(context.Background(), &p.writeOptions, filteredMessagesPoint); err != nil {
141+
panic(err)
142+
}
143+
144+
sentMessagesPoint := influxdb3.NewPointWithMeasurement("blink_data").
145+
SetTag("group", p.groupName).
146+
SetTag("pipeline", strconv.Itoa(p.pipelineId)).
147+
SetTag("processor", proc).
148+
SetField("sent_messages", counters[1].Count()).
149+
SetTimestamp(t)
150+
151+
if err := p.client.WritePointsWithOptions(context.Background(), &p.writeOptions, sentMessagesPoint); err != nil {
152+
panic(err)
153+
}
154+
155+
receivedMessagesPoint := influxdb3.NewPointWithMeasurement("blink_data").
156+
SetTag("group", p.groupName).
157+
SetTag("pipeline", strconv.Itoa(p.pipelineId)).
158+
SetTag("processor", proc).
159+
SetField("sent_messages", counters[2].Count()).
160+
SetTimestamp(t)
161+
162+
if err := p.client.WritePointsWithOptions(context.Background(), &p.writeOptions, receivedMessagesPoint); err != nil {
163+
panic(err)
164+
}
165+
}
166+
85167
point := influxdb3.NewPointWithMeasurement("blink_data").
86168
SetTag("group", p.groupName).
87169
SetTag("pipeline", strconv.Itoa(p.pipelineId)).

internal/metrics/metrics.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,11 @@ type Metrics interface {
55
IncrementSentCounter()
66
IncrementSourceErrCounter()
77
IncrementSinkErrCounter()
8+
9+
RegisterProcessors(processors []string)
10+
11+
SetProcessorExecutionTime(proc string, time int64)
12+
IncrementProcessorDroppedMessages(proc string)
13+
IncrementProcessorReceivedMessages(proc string)
14+
IncrementProcessorSentMessages(proc string)
815
}

internal/metrics/prometheus/plugin.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package prometheus
22

33
import (
4+
"fmt"
45
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
56
"github.com/prometheus/client_golang/prometheus"
67
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -14,6 +15,9 @@ type Plugin struct {
1415

1516
writeOptions influxdb3.WriteOptions
1617

18+
procCounters map[string][]prometheus.Counter
19+
procExecutionTimeGauges map[string]prometheus.Gauge
20+
1721
groupName string
1822
pipelineId int
1923
}
@@ -38,6 +42,8 @@ func NewPlugin(config Config) (*Plugin, error) {
3842
Name: "source_errors",
3943
Help: "The total number of errors from source connector",
4044
}),
45+
procCounters: map[string][]prometheus.Counter{},
46+
procExecutionTimeGauges: map[string]prometheus.Gauge{},
4147
}
4248
return plugin, nil
4349
}
@@ -57,3 +63,42 @@ func (p *Plugin) IncrementSinkErrCounter() {
5763
func (p *Plugin) IncrementSourceErrCounter() {
5864
p.sourceErrorsCounter.Inc()
5965
}
66+
67+
func (p *Plugin) RegisterProcessors(processors []string) {
68+
for _, proc := range processors {
69+
p.procExecutionTimeGauges[proc] = promauto.NewGauge(prometheus.GaugeOpts{
70+
Name: fmt.Sprintf("%s_execution_time", proc),
71+
Help: "Time taken to process message",
72+
})
73+
p.procCounters[proc] = []prometheus.Counter{
74+
promauto.NewCounter(prometheus.CounterOpts{
75+
Name: fmt.Sprintf("%s_dropped_messages", proc),
76+
Help: "Messages that were dropped, filtered by the processor",
77+
}),
78+
promauto.NewCounter(prometheus.CounterOpts{
79+
Name: fmt.Sprintf("%s_sent_messages", proc),
80+
Help: "The total number of messages send to the next sink/processor plugin",
81+
}),
82+
promauto.NewCounter(prometheus.CounterOpts{
83+
Name: fmt.Sprintf("%s_received_messages", proc),
84+
Help: "The total number of messages send to the sink plugin",
85+
}),
86+
}
87+
}
88+
}
89+
90+
func (p *Plugin) SetProcessorExecutionTime(proc string, time int64) {
91+
p.procExecutionTimeGauges[proc].Set(float64(time))
92+
}
93+
94+
func (p *Plugin) IncrementProcessorDroppedMessages(proc string) {
95+
p.procCounters[proc][0].Inc()
96+
}
97+
98+
func (p *Plugin) IncrementProcessorReceivedMessages(proc string) {
99+
p.procCounters[proc][1].Inc()
100+
}
101+
102+
func (p *Plugin) IncrementProcessorSentMessages(proc string) {
103+
p.procCounters[proc][2].Inc()
104+
}

public/stream/processor_wrapper.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,30 @@ package stream
22

33
import (
44
"blink/internal/message"
5+
"blink/internal/metrics"
56
"blink/internal/processors"
67
"blink/internal/processors/openai"
78
"blink/internal/schema"
89
"blink/internal/sources"
910
"blink/internal/stream_context"
1011
"errors"
12+
"time"
1113
)
1214

1315
// ProcessorWrapper wraps plan sink writer plugin in order to
1416
// measure performance, build proper configuration and control the context
1517
type ProcessorWrapper struct {
1618
processorDriver processors.DataProcessor
1719
ctx *stream_context.Context
20+
metrics metrics.Metrics
21+
procDriver string
1822
}
1923

2024
func NewProcessorWrapper(pluginType processors.ProcessorDriver, config interface{}, appctx *stream_context.Context) ProcessorWrapper {
21-
loader := ProcessorWrapper{}
25+
loader := ProcessorWrapper{
26+
metrics: appctx.Metrics,
27+
procDriver: string(pluginType),
28+
}
2229
loader.ctx = appctx
2330
loadedDriver, err := loader.LoadDriver(pluginType, config)
2431
if err != nil {
@@ -29,7 +36,16 @@ func NewProcessorWrapper(pluginType processors.ProcessorDriver, config interface
2936
}
3037

3138
func (p *ProcessorWrapper) Process(msg sources.MessageEvent) (message.Message, error) {
32-
return p.processorDriver.Process(p.ctx.GetContext(), msg.Message)
39+
p.metrics.IncrementProcessorReceivedMessages(p.procDriver)
40+
execStart := time.Now()
41+
procMsg, err := p.processorDriver.Process(p.ctx.GetContext(), msg.Message)
42+
if err == nil {
43+
p.metrics.IncrementProcessorSentMessages(string(p.procDriver))
44+
}
45+
46+
execEnd := time.Since(execStart)
47+
p.metrics.SetProcessorExecutionTime(p.procDriver, execEnd.Milliseconds())
48+
return procMsg, err
3349
}
3450

3551
func (p *ProcessorWrapper) EvolveSchema(s *schema.StreamSchemaObj) error {

public/stream/stream.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,18 @@ type Stream struct {
2828
func InitFromConfig(config config.Configuration) (*Stream, error) {
2929
streamContext := stream_context.CreateContext()
3030
streamContext.Logger.Info("Bootstrapping blink Stream-ETL")
31+
32+
var processorList []string
33+
for _, proc := range config.Processors {
34+
processorList = append(processorList, string(proc.Driver))
35+
}
3136
if config.Service.InfluxEnabled {
3237
metrics, err := loadInfluxMetrics(config.Service.Influx)
3338
if err != nil {
3439
streamContext.Logger.WithPrefix("Metrics").Error("failed to load influx metrics")
3540
return nil, err
3641
}
42+
metrics.RegisterProcessors(processorList)
3743
streamContext.SetMetrics(metrics)
3844
streamContext.Logger.WithPrefix("Metrics").Info("Component has been loaded")
3945
} else {
@@ -44,6 +50,7 @@ func InitFromConfig(config config.Configuration) (*Stream, error) {
4450
streamContext.Logger.WithPrefix("Metrics").Error("failed to load local prometheus metrics")
4551
return nil, err
4652
}
53+
metrics.RegisterProcessors(processorList)
4754
streamContext.SetMetrics(metrics)
4855
streamContext.Logger.WithPrefix("Metrics").Info("Component has been loaded")
4956
}

0 commit comments

Comments
 (0)