Skip to content

Commit

Permalink
Try to align runs with frequency
Browse files Browse the repository at this point in the history
When the agent restarts, we might end up in a situation where we are
running a check too often. For example, if a check is configured to run
once every 10 minutes, and the check ran 1 minute ago, if we run
immediately, we will have two executions within that 10 minute window.

Since we have to publish samples once every two minutes, we cannot wait
for 9 minutes, because we end up with a gap. Instead, do run immediately
to avoid that.

But if the check ran 9 minutes ago, we can align with the expectation by
waiting for 1 minute instead of a random value. Presumably the check ran
9 minutes ago, and the sample was replicated 7, 5, 3 and 1 minutes ago.
If we wait for 1 minute, we would end up running the check when it was
expected to run.

In order to actually fix this issue the agents would have to persist
data across runs. It might be possible to do this by offloading
publishing to another service.

Fixes: #739
Signed-off-by: Marcelo E. Magallon <marcelo.magallon@grafana.com>
  • Loading branch information
mem committed Jul 3, 2024
1 parent d798772 commit 3f8bb1b
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 8 deletions.
52 changes: 44 additions & 8 deletions internal/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,8 @@ func (s *Scraper) Run(ctx context.Context) {
// TODO(mem): keep count of the number of successive errors and
// collect logs if threshold is reached.

var (
frequency = ms(s.check.Frequency)
offset = ms(s.check.Offset)
)

if offset == 0 {
offset = randDuration(min(frequency, maxPublishInterval))
}
frequency := ms(s.check.Frequency)
offset := computeOffset(ms(s.check.Offset), frequency, timeFromNs(s.check.Created), time.Now())

scrapeHandler := scrapeHandler{scraper: s}

Expand Down Expand Up @@ -334,6 +328,48 @@ func ms(n int64) time.Duration {
return time.Duration(n) * time.Millisecond
}

func timeFromNs(ns float64) time.Time {
sec := int64(math.Floor(ns / 1e9))
nsec := int64(math.Mod(ns, 1e9))
return time.Unix(sec, nsec)
}

func computeOffset(offset, frequency time.Duration, t0, now time.Time) time.Duration {
if now.Sub(t0) < frequency {
// The check was created less than the frequency ago, we should
// starting running it right away.
if offset != 0 {
return offset
}

return randDuration(min(frequency, maxPublishInterval))
}

// The check was created more than the frequency ago, so we need to
// compute the time until the next time the check should run.
//
// Compute the number of runs since t0, add one for the next run and
// multiply by the frequency in order to obtain its timestamp. Finally,
// compute the remaining time until that timestamp.

runs := (now.UnixMilli() - t0.UnixMilli()) / frequency.Milliseconds()

timeUntilNextRun := t0.Add(time.Duration(runs+1) * frequency).Sub(now)

if timeUntilNextRun <= maxPublishInterval {
return timeUntilNextRun
}

// The reason why we need to ignore the computed offset is that the
// check ran in the past, and it's possible that it was filling the
// data with repeated samples that we no longer have access to. We
// cannot wait until the next run because that might be a long time
// from now, creating a gap in the data. Instead we wait for a random
// value that avoids creating gaps (assuming the last published sample
// was recent).
return randDuration(maxPublishInterval)
}

func randDuration(d time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(d)))
}
Expand Down
110 changes: 110 additions & 0 deletions internal/scraper/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1991,3 +1991,113 @@ func TestTickWithOffset(t *testing.T) {
})
}
}

func TestTimeFromNs(t *testing.T) {
testcases := map[string]struct {
ns float64
expected time.Time
}{
"zero": {
ns: 0,
expected: time.Unix(0, 0),
},
"one": {
ns: 1,
expected: time.Unix(0, 1),
},
"2020-01-01 00:00:00.000000000": {
ns: 1577836800 * 1e9,
expected: time.Unix(1577836800, 0),
},
"2024-07-02 21:21:50.123456768": {
ns: 1719955310*1e9 + 123456789,
expected: time.Unix(1719955310, 123456789),
},
"2262-04-11 23:47:15.999999999": {
ns: 9223372035*1e9 + 999999999, // This is close to the maximum value that can be represented by a time.Time
expected: time.Unix(9223372035, 999999999),
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
actual := timeFromNs(tc.ns)
// Why UnixMicro instead of UnixNano? Because some
// precision is lost during the conversion from int64
// to float64, and getting this right at the microsecond
// level is good enough.
require.InDelta(t, tc.expected.UnixMicro(), actual.UnixMicro(), 1)
})
}
}

func TestComputeOffset(t *testing.T) {
t0 := time.Unix(1_000_000, 0)

testcases := map[string]struct {
frequency time.Duration
offset time.Duration
now time.Time
expected time.Duration
}{
"zero": {
offset: 0,
frequency: 60 * time.Second,
now: t0.Add(0),
expected: 0,
},
"1s": {
offset: 1 * time.Second,
frequency: 60 * time.Second,
now: t0.Add(0),
expected: 1 * time.Second,
},
"30s": {
offset: 30 * time.Second,
frequency: 60 * time.Second,
now: t0.Add(0),
expected: 30 * time.Second,
},
"created 100 seconds ago": {
offset: 0 * time.Second,
frequency: 60 * time.Second,
now: t0.Add(100 * time.Second),
expected: 20 * time.Second, // 100 - 60 = 40 -> 60 - 40 = 20
},
"created 1000 seconds ago": {
offset: 0 * time.Second,
frequency: 60 * time.Second,
now: t0.Add(1000 * time.Second),
expected: 20 * time.Second, // 1000 / 60 = 16 -> 1000 - 60 * 16 = 40 -> 60 - 40 = 20
},
"slow check": {
offset: 0 * time.Second,
frequency: 5 * time.Minute,
now: t0.Add(1000 * time.Minute),
expected: 0,
},
"slow check close to next run": {
offset: 0 * time.Second,
frequency: 5 * time.Minute,
now: t0.Add(999 * time.Minute),
expected: 1 * time.Minute,
},
"slow check just ran": {
offset: 0 * time.Second,
frequency: 5 * time.Minute,
now: t0.Add(1001 * time.Minute),
expected: 0,
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
actual := computeOffset(tc.offset, tc.frequency, t0, tc.now)
if tc.expected != 0 {
require.Equal(t, tc.expected, actual)
} else {
require.LessOrEqual(t, actual, maxPublishInterval)
}
})
}
}

0 comments on commit 3f8bb1b

Please sign in to comment.