Skip to content

Commit 9929562

Browse files
authored
[processor/deltatocumulative]: timer-based expiry (open-telemetry#31625)
**Description:** Moves from complex preemptive expiry to a plain 1 minute timer **Link to tracking Issue:** open-telemetry#31615 (comment) Resolves open-telemetry#31615
1 parent 2f614ef commit 9929562

File tree

6 files changed

+49
-304
lines changed

6 files changed

+49
-304
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "bug_fix"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: "deltatocumulativeprocessor"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: timer-based expiry
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31615]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
converts expiry to 1m timer, eliminating a race condition and failing test
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: [user]

processor/deltatocumulativeprocessor/internal/clock/clock.go

Lines changed: 0 additions & 94 deletions
This file was deleted.

processor/deltatocumulativeprocessor/internal/clock/clock_test.go

Lines changed: 0 additions & 32 deletions
This file was deleted.

processor/deltatocumulativeprocessor/internal/streams/expiry.go

Lines changed: 0 additions & 60 deletions
This file was deleted.

processor/deltatocumulativeprocessor/internal/streams/expiry_test.go

Lines changed: 0 additions & 100 deletions
This file was deleted.

processor/deltatocumulativeprocessor/processor.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ import (
77
"context"
88
"errors"
99
"sync"
10+
"time"
1011

1112
"go.opentelemetry.io/collector/component"
1213
"go.opentelemetry.io/collector/consumer"
1314
"go.opentelemetry.io/collector/pdata/pmetric"
1415
"go.opentelemetry.io/collector/processor"
1516
"go.uber.org/zap"
1617

18+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness"
1719
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
1820
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta"
1921
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics"
@@ -29,8 +31,8 @@ type Processor struct {
2931
ctx context.Context
3032
cancel context.CancelFunc
3133

32-
aggr streams.Aggregator[data.Number]
33-
exp *streams.Expiry[data.Number]
34+
aggr streams.Aggregator[data.Number]
35+
stale *staleness.Staleness[data.Number]
3436

3537
mtx sync.Mutex
3638
}
@@ -49,32 +51,33 @@ func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processo
4951
dps = delta.New[data.Number]()
5052

5153
if cfg.MaxStale > 0 {
52-
exp := streams.ExpireAfter(dps, cfg.MaxStale)
53-
proc.exp = &exp
54-
dps = &exp
54+
stale := staleness.NewStaleness(cfg.MaxStale, dps)
55+
proc.stale = stale
56+
dps = stale
5557
}
5658

5759
proc.aggr = streams.IntoAggregator(dps)
5860
return &proc
5961
}
6062

6163
func (p *Processor) Start(_ context.Context, _ component.Host) error {
62-
if p.exp != nil {
63-
go func() {
64-
for {
64+
if p.stale == nil {
65+
return nil
66+
}
67+
68+
go func() {
69+
tick := time.NewTicker(time.Minute)
70+
for {
71+
select {
72+
case <-p.ctx.Done():
73+
return
74+
case <-tick.C:
6575
p.mtx.Lock()
66-
next := p.exp.ExpireOldEntries()
76+
p.stale.ExpireOldEntries()
6777
p.mtx.Unlock()
68-
69-
select {
70-
case <-next:
71-
case <-p.ctx.Done():
72-
return
73-
}
7478
}
75-
}()
76-
}
77-
79+
}
80+
}()
7881
return nil
7982
}
8083

0 commit comments

Comments
 (0)