Skip to content

Commit 27cde87

Browse files
authored
[Heartbeat] Adjust State loader to only retry for failed requests and not for 4xx (#37981)
* only retry when the status is 5xx * remove test AAA comments * add changelog * correct changelog modification * fix ES query * change error handling strategy * do not retry when there is malformed data * improve retry mechanism * improve log message * improve changelog * fix log format
1 parent 0361e30 commit 27cde87

File tree

6 files changed

+160
-15
lines changed

6 files changed

+160
-15
lines changed

CHANGELOG.next.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ fields added to events containing the Beats version. {pull}37553[37553]
100100

101101
*Heartbeat*
102102

103+
- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702]
104+
- Fix setuid root when running under cgroups v2. {pull}37794[37794]
105+
- Adjust State loader to only retry when response code status is 5xx {pull}37981[37981]
103106

104107
*Metricbeat*
105108

heartbeat/monitors/wrappers/monitorstate/esloader.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,21 @@ import (
3232

3333
var DefaultDataStreams = "synthetics-*,heartbeat-*"
3434

35+
type LoaderError struct {
36+
err error
37+
Retry bool
38+
}
39+
40+
func (e LoaderError) Error() string {
41+
return e.err.Error()
42+
}
43+
3544
func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader {
3645
if indexPattern == "" {
3746
// Should never happen, but if we ever make a coding error...
3847
logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES")
3948
return NilStateLoader
4049
}
41-
4250
return func(sf stdfields.StdMonitorFields) (*State, error) {
4351
var runFromID string
4452
if sf.RunFrom != nil {
@@ -74,10 +82,11 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
7482
},
7583
},
7684
}
77-
7885
status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody)
7986
if err != nil || status > 299 {
80-
return nil, fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err)
87+
sErr := fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err)
88+
retry := shouldRetry(status)
89+
return nil, LoaderError{err: sErr, Retry: retry}
8190
}
8291

8392
type stateHits struct {
@@ -94,7 +103,8 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
94103
sh := stateHits{}
95104
err = json.Unmarshal(body, &sh)
96105
if err != nil {
97-
return nil, fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err)
106+
sErr := fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err)
107+
return nil, LoaderError{err: sErr, Retry: false}
98108
}
99109

100110
if len(sh.Hits.Hits) == 0 {
@@ -107,3 +117,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
107117
return state, nil
108118
}
109119
}
120+
121+
func shouldRetry(status int) bool {
122+
return status >= 500
123+
}

heartbeat/monitors/wrappers/monitorstate/esloader_test.go

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ package monitorstate
2121

2222
import (
2323
"fmt"
24+
"io"
25+
"net/http"
26+
"strings"
2427
"testing"
2528
"time"
2629

@@ -33,6 +36,7 @@ import (
3336
"github.com/elastic/beats/v7/heartbeat/config"
3437
"github.com/elastic/beats/v7/heartbeat/esutil"
3538
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
39+
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
3640
"github.com/elastic/beats/v7/libbeat/processors/util"
3741
)
3842

@@ -51,7 +55,7 @@ func TestStatesESLoader(t *testing.T) {
5155

5256
monID := etc.createTestMonitorStateInES(t, testStatus)
5357
// Since we've continued this state it should register the initial state
54-
ms := etc.tracker.GetCurrentState(monID)
58+
ms := etc.tracker.GetCurrentState(monID, RetryConfig{})
5559
require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off")
5660
requireMSStatusCount(t, ms, testStatus, 1)
5761

@@ -89,8 +93,61 @@ func TestStatesESLoader(t *testing.T) {
8993
}
9094
}
9195

96+
func TestMakeESLoaderError(t *testing.T) {
97+
tests := []struct {
98+
name string
99+
statusCode int
100+
expected bool
101+
}{
102+
{
103+
name: "should return a retryable error",
104+
statusCode: http.StatusInternalServerError,
105+
expected: true,
106+
},
107+
{
108+
name: "should not return a retryable error",
109+
statusCode: http.StatusNotFound,
110+
expected: false,
111+
},
112+
{
113+
name: "should not return a retryable error when handling malformed data",
114+
statusCode: http.StatusOK,
115+
expected: false,
116+
},
117+
}
118+
for _, test := range tests {
119+
t.Run(test.name, func(t *testing.T) {
120+
etc := newESTestContext(t)
121+
etc.ec.HTTP = fakeHTTPClient{respStatus: test.statusCode}
122+
loader := MakeESLoader(etc.ec, "fakeIndexPattern", etc.location)
123+
124+
_, err := loader(stdfields.StdMonitorFields{})
125+
126+
var loaderError LoaderError
127+
require.ErrorAs(t, err, &loaderError)
128+
require.Equal(t, loaderError.Retry, test.expected)
129+
})
130+
}
131+
}
132+
133+
type fakeHTTPClient struct {
134+
respStatus int
135+
}
136+
137+
func (fc fakeHTTPClient) Do(req *http.Request) (resp *http.Response, err error) {
138+
return &http.Response{
139+
StatusCode: fc.respStatus,
140+
Body: io.NopCloser(strings.NewReader("test response")),
141+
}, nil
142+
}
143+
144+
func (fc fakeHTTPClient) CloseIdleConnections() {
145+
// noop
146+
}
147+
92148
type esTestContext struct {
93149
namespace string
150+
ec *eslegclient.Connection
94151
esc *elasticsearch.Client
95152
loader StateLoader
96153
tracker *Tracker
@@ -106,10 +163,12 @@ func newESTestContext(t *testing.T) *esTestContext {
106163
}
107164
namespace, _ := uuid.NewV4()
108165
esc := IntegApiClient(t)
166+
ec := IntegES(t)
109167
etc := &esTestContext{
110168
namespace: namespace.String(),
111169
esc: esc,
112-
loader: IntegESLoader(t, fmt.Sprintf("synthetics-*-%s", namespace.String()), location),
170+
ec: ec,
171+
loader: IntegESLoader(t, ec, fmt.Sprintf("synthetics-*-%s", namespace.String()), location),
113172
location: location,
114173
}
115174

heartbeat/monitors/wrappers/monitorstate/testutil.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import (
3333

3434
// Helpers for tests here and elsewhere
3535

36-
func IntegESLoader(t *testing.T, indexPattern string, location *config.LocationWithID) StateLoader {
37-
return MakeESLoader(IntegES(t), indexPattern, location)
36+
func IntegESLoader(t *testing.T, esc *eslegclient.Connection, indexPattern string, location *config.LocationWithID) StateLoader {
37+
return MakeESLoader(esc, indexPattern, location)
3838
}
3939

4040
func IntegES(t *testing.T) (esc *eslegclient.Connection) {

heartbeat/monitors/wrappers/monitorstate/tracker.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
6262
t.mtx.Lock()
6363
defer t.mtx.Unlock()
6464

65-
state := t.GetCurrentState(sf)
65+
state := t.GetCurrentState(sf, RetryConfig{})
6666
if state == nil {
6767
state = newMonitorState(sf, newStatus, 0, t.flappingEnabled)
6868
logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String())
@@ -75,36 +75,56 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
7575
}
7676

7777
func (t *Tracker) GetCurrentStatus(sf stdfields.StdMonitorFields) StateStatus {
78-
s := t.GetCurrentState(sf)
78+
s := t.GetCurrentState(sf, RetryConfig{})
7979
if s == nil {
8080
return StatusEmpty
8181
}
8282
return s.Status
8383
}
8484

85-
func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) {
85+
type RetryConfig struct {
86+
attempts int
87+
waitFn func() time.Duration
88+
}
89+
90+
func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) (state *State) {
8691
if state, ok := t.states[sf.ID]; ok {
8792
return state
8893
}
8994

90-
tries := 3
95+
// Default number of attempts
96+
attempts := 3
97+
if rc.attempts != 0 {
98+
attempts = rc.attempts
99+
}
100+
91101
var loadedState *State
92102
var err error
93-
for i := 0; i < tries; i++ {
103+
var i int
104+
for i = 0; i < attempts; i++ {
94105
loadedState, err = t.stateLoader(sf)
95106
if err == nil {
96107
if loadedState != nil {
97108
logp.L().Infof("loaded previous state for monitor %s: %s", sf.ID, loadedState.String())
98109
}
99110
break
100111
}
112+
var loaderError LoaderError
113+
if errors.As(err, &loaderError) && !loaderError.Retry {
114+
logp.L().Warnf("could not load last externally recorded state: %v", loaderError)
115+
break
116+
}
101117

118+
// Default sleep time
102119
sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond)
103-
logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err)
120+
if rc.waitFn != nil {
121+
sleepFor = rc.waitFn()
122+
}
123+
logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %v", sleepFor.Milliseconds(), err)
104124
time.Sleep(sleepFor)
105125
}
106126
if err != nil {
107-
logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", tries, sf.ID)
127+
logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", i+1, sf.ID)
108128
}
109129

110130
if loadedState != nil {

heartbeat/monitors/wrappers/monitorstate/tracker_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package monitorstate
1919

2020
import (
21+
"errors"
2122
"testing"
2223
"time"
2324

@@ -131,3 +132,51 @@ func TestDeferredStateLoader(t *testing.T) {
131132
resState, _ = dsl(stdfields.StdMonitorFields{})
132133
require.Equal(t, stateA, resState)
133134
}
135+
136+
func TestStateLoaderRetry(t *testing.T) {
137+
// While testing the sleep time between retries should be negligible
138+
waitFn := func() time.Duration {
139+
return time.Microsecond
140+
}
141+
142+
tests := []struct {
143+
name string
144+
retryable bool
145+
rc RetryConfig
146+
expectedCalls int
147+
}{
148+
{
149+
"should retry 3 times when fails with retryable error",
150+
true,
151+
RetryConfig{waitFn: waitFn},
152+
3,
153+
},
154+
{
155+
"should not retry when fails with non-retryable error",
156+
false,
157+
RetryConfig{waitFn: waitFn},
158+
1,
159+
},
160+
{
161+
"should honour the configured number of attempts when fails with retryable error",
162+
true,
163+
RetryConfig{attempts: 5, waitFn: waitFn},
164+
5,
165+
},
166+
}
167+
168+
for _, tt := range tests {
169+
t.Run(tt.name, func(t *testing.T) {
170+
calls := 0
171+
errorStateLoader := func(_ stdfields.StdMonitorFields) (*State, error) {
172+
calls += 1
173+
return nil, LoaderError{err: errors.New("test error"), Retry: tt.retryable}
174+
}
175+
176+
mst := NewTracker(errorStateLoader, true)
177+
mst.GetCurrentState(stdfields.StdMonitorFields{}, tt.rc)
178+
179+
require.Equal(t, calls, tt.expectedCalls)
180+
})
181+
}
182+
}

0 commit comments

Comments
 (0)