Skip to content

Commit 924ea07

Browse files
Remove race condition when accessing remote bulker map (#4171)
Use the remoteOutputMutex whenever accessing the bulkerMap. Change GetBulkerMap to return a copy of the map so that remote output health will not conflict with adding/removing a bulker from the map.
1 parent cf41f38 commit 924ea07

File tree

3 files changed

+104
-2
lines changed

3 files changed

+104
-2
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Remove race in remote bulker access
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: fleet-server
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/elastic/fleet-server/issues/4170

internal/pkg/bulk/bulk_remote_output_test.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212

1313
"github.com/rs/zerolog"
1414
"github.com/stretchr/testify/assert"
15+
16+
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
1517
)
1618

1719
func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) {
@@ -78,9 +80,10 @@ func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) {
7880

7981
for _, tc := range testcases {
8082
t.Run(tc.name, func(t *testing.T) {
83+
log := testlog.SetLogger(t)
8184
bulker := NewBulker(nil, nil)
8285
bulker.remoteOutputConfigMap["remote1"] = tc.cfg
83-
hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(zerolog.Nop(), "remote1", tc.newCfg)
86+
hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(log, "remote1", tc.newCfg)
8487
assert.Equal(t, tc.changed, hasChanged)
8588
assert.Equal(t, tc.newCfg, bulker.remoteOutputConfigMap["remote1"])
8689
})
@@ -148,3 +151,57 @@ func Test_CreateAndGetBulkerChanged(t *testing.T) {
148151
assert.Nil(t, err)
149152
assert.Equal(t, true, cancelFnCalled)
150153
}
154+
155+
func Benchmark_CreateAndGetBulker(b *testing.B) {
156+
b.Skip("Crashes on remote runner")
157+
ctx, cancel := context.WithCancel(context.Background())
158+
defer cancel()
159+
log := zerolog.Nop()
160+
outputMap := map[string]map[string]any{
161+
"remote1": map[string]any{
162+
"type": "remote_elasticsearch",
163+
"hosts": []interface{}{"https://remote-es:443"},
164+
"service_token": "token1",
165+
},
166+
}
167+
b.Run("new remote bulker", func(b *testing.B) {
168+
bulker := NewBulker(nil, nil)
169+
b.ReportAllocs()
170+
for range b.N {
171+
b.StopTimer()
172+
bulker.bulkerMap = make(map[string]Bulk)
173+
bulker.remoteOutputConfigMap = make(map[string]map[string]any)
174+
b.StartTimer()
175+
176+
bulker.CreateAndGetBulker(ctx, log, "remote1", outputMap)
177+
}
178+
})
179+
b.Run("existing remote bulker", func(b *testing.B) {
180+
bulker := NewBulker(nil, nil)
181+
outputBulker := NewBulker(nil, nil)
182+
bulker.bulkerMap["remote1"] = outputBulker
183+
bulker.remoteOutputConfigMap["remote1"] = outputMap["remote1"]
184+
b.ResetTimer()
185+
b.ReportAllocs()
186+
for range b.N {
187+
bulker.CreateAndGetBulker(ctx, log, "remote1", outputMap)
188+
}
189+
})
190+
b.Run("changed remote bulker", func(b *testing.B) {
191+
b.ReportAllocs()
192+
for range b.N {
193+
b.StopTimer()
194+
bulker := NewBulker(nil, nil)
195+
outputBulker := NewBulker(nil, nil)
196+
bulker.bulkerMap["remote1"] = outputBulker
197+
bulker.remoteOutputConfigMap["remote1"] = map[string]any{
198+
"type": "remote_elasticsearch",
199+
"hosts": []interface{}{"https://remote-es:443"},
200+
"service_token": "wrong token",
201+
}
202+
b.StartTimer()
203+
204+
bulker.CreateAndGetBulker(ctx, log, "remote1", outputMap)
205+
}
206+
})
207+
}

internal/pkg/bulk/engine.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,20 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker
130130
}
131131

132132
func (b *Bulker) GetBulker(outputName string) Bulk {
133+
b.remoteOutputMutex.RLock()
134+
defer b.remoteOutputMutex.RUnlock()
133135
return b.bulkerMap[outputName]
134136
}
135137

138+
// GetBulkerMap returns a copy of the remote output bulkers
136139
func (b *Bulker) GetBulkerMap() map[string]Bulk {
137-
return b.bulkerMap
140+
mp := make(map[string]Bulk)
141+
b.remoteOutputMutex.RLock()
142+
for k, v := range b.bulkerMap {
143+
mp[k] = v
144+
}
145+
b.remoteOutputMutex.RUnlock()
146+
return mp
138147
}
139148

140149
func (b *Bulker) CancelFn() context.CancelFunc {
@@ -155,7 +164,9 @@ func (b *Bulker) updateBulkerMap(outputName string, newBulker *Bulker) {
155164
// if changed, stop the existing bulker and create a new one
156165
func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) {
157166
hasConfigChanged := b.hasChangedAndUpdateRemoteOutputConfig(zlog, outputName, outputMap[outputName])
167+
b.remoteOutputMutex.RLock()
158168
bulker := b.bulkerMap[outputName]
169+
b.remoteOutputMutex.RUnlock()
159170
if bulker != nil && !hasConfigChanged {
160171
return bulker, false, nil
161172
}
@@ -429,6 +440,8 @@ func (b *Bulker) Run(ctx context.Context) error {
429440

430441
// cancelling context of each remote bulker when Run exits
431442
defer func() {
443+
b.remoteOutputMutex.RLock()
444+
defer b.remoteOutputMutex.RUnlock()
432445
for _, bulker := range b.bulkerMap {
433446
bulker.CancelFn()()
434447
}

0 commit comments

Comments
 (0)