Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scrape: v0.51 expose staleness disabling #35

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions scrape/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,14 @@ func (m *Manager) TargetsDroppedCounts() map[string]int {
}
return counts
}

// DisableEndOfRunStalenessMarkers disables the end-of-run staleness markers for the provided targets in the given
// targetSet. When the end-of-run staleness is disabled for a target, when it goes away, there will be no staleness
// markers written for its series.
func (m *Manager) DisableEndOfRunStalenessMarkers(targetSet string, targets []*Target) {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
if pool, ok := m.scrapePools[targetSet]; ok {
pool.disableEndOfRunStalenessMarkers(targets)
}
}
51 changes: 51 additions & 0 deletions scrape/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"slices"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -857,3 +858,53 @@ func getResultFloats(app *collectResultAppender, expectedMetricName string) (res
}
return result
}

func TestManagerDisableEndOfRunStalenessMarkers(t *testing.T) {
configForJob := func(jobName string) *config.ScrapeConfig {
return &config.ScrapeConfig{
JobName: jobName,
ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(1 * time.Minute),
ScrapeProtocols: config.DefaultScrapeProtocols,
}
}

cfg := &config.Config{ScrapeConfigs: []*config.ScrapeConfig{
configForJob("one"),
configForJob("two"),
}}

m, err := NewManager(&Options{}, nil, &nopAppendable{}, prometheus.NewRegistry())
require.NoError(t, err)
defer m.Stop()
require.NoError(t, m.ApplyConfig(cfg))

// Pass targets to the manager.
tgs := map[string][]*targetgroup.Group{
"one": {{Targets: []model.LabelSet{{"__address__": "h1"}, {"__address__": "h2"}, {"__address__": "h3"}}}},
"two": {{Targets: []model.LabelSet{{"__address__": "h4"}}}},
}
m.updateTsets(tgs)
m.reload()

activeTargets := m.TargetsActive()
targetsToDisable := []*Target{
activeTargets["one"][0],
activeTargets["one"][2],
NewTarget(labels.FromStrings("__address__", "h4"), labels.EmptyLabels(), nil), // non-existent target.
}

// Disable end of run staleness markers for some targets.
m.DisableEndOfRunStalenessMarkers("one", targetsToDisable)
// This should be a no-op
m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable)

// Check that the end of run staleness markers are disabled for the correct targets.
for _, group := range []string{"one", "two"} {
for _, tg := range activeTargets[group] {
loop := m.scrapePools[group].loops[tg.hash()].(*scrapeLoop)
expectedDisabled := slices.Contains(targetsToDisable, tg)
require.Equal(t, expectedDisabled, loop.disabledEndOfRunStalenessMarkers.Load())
}
}
}
22 changes: 19 additions & 3 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"go.uber.org/atomic"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
Expand Down Expand Up @@ -543,6 +544,16 @@ func (sp *scrapePool) refreshTargetLimitErr() error {
return nil
}

func (sp *scrapePool) disableEndOfRunStalenessMarkers(targets []*Target) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
for i := range targets {
if l, ok := sp.loops[targets[i].hash()]; ok {
l.disableEndOfRunStalenessMarkers()
}
}
}

func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error {
if limits == nil {
return nil
Expand Down Expand Up @@ -840,7 +851,7 @@ type scrapeLoop struct {
cancel func()
stopped chan struct{}

disabledEndOfRunStalenessMarkers bool
disabledEndOfRunStalenessMarkers atomic.Bool

reportExtraMetrics bool
appendMetadataToWAL bool
Expand Down Expand Up @@ -1247,7 +1258,7 @@ mainLoop:

close(sl.stopped)

if !sl.disabledEndOfRunStalenessMarkers {
if !sl.disabledEndOfRunStalenessMarkers.Load() {
sl.endOfRunStaleness(last, ticker, sl.interval)
}
}
Expand Down Expand Up @@ -1408,6 +1419,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
case <-time.After(interval / 10):
}

// Check if end-of-run staleness markers have been disabled while we were waiting.
if sl.disabledEndOfRunStalenessMarkers.Load() {
return
}

// Call sl.append again with an empty scrape to trigger stale markers.
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
Expand Down Expand Up @@ -1442,7 +1458,7 @@ func (sl *scrapeLoop) stop() {
}

func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() {
sl.disabledEndOfRunStalenessMarkers = true
sl.disabledEndOfRunStalenessMarkers.Store(true)
}

func (sl *scrapeLoop) getCache() *scrapeCache {
Expand Down
45 changes: 45 additions & 0 deletions scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3734,3 +3734,48 @@ scrape_configs:
require.Equal(t, expectedSchema, h.Schema)
}
}

func TestScrapeLoopDisableStalenessMarkerInjection(t *testing.T) {
var (
loopDone = make(chan struct{}, 1)
appender = &collectResultAppender{}
scraper = &testScraper{}
app = func(ctx context.Context) storage.Appender { return appender }
)

sl := newBasicScrapeLoop(t, context.Background(), scraper, app, 10*time.Millisecond)
// Count scrapes and terminate loop after 2 scrapes.
numScrapes, maxScrapes := 0, 2
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
if numScrapes >= maxScrapes {
go sl.stop()
<-sl.ctx.Done()
}
if _, err := w.Write([]byte("metric_a 42\n")); err != nil {
return err
}
return ctx.Err()
}

go func() {
sl.run(nil)
loopDone <- struct{}{}
}()

// Disable end of run staleness markers.
sl.disableEndOfRunStalenessMarkers()

select {
case <-loopDone:
case <-time.After(5 * time.Second):
t.Fatalf("Scrape loop didn't stop.")
}

// No stale markers should be appended, since they were disabled.
for _, s := range appender.resultFloats {
if value.IsStaleNaN(s.f) {
t.Fatalf("Got stale NaN samples while end of run staleness is disabled: %x", math.Float64bits(s.f))
}
}
}
Loading