Skip to content

Commit 536037b

Browse files
authored
Limit to single inflight package syncing operation (#289)
This PR revives work done in previous PRs (#118, #120) to make sure that only a single package syncing operation is ever in flight and also adds a test. The previous PRs did not account for needing to also protect `initStatuses` and `SetPackageStatuses`, so that's why the Lock and Unlock statements are not just paired in doSync. If you think the intent would be clearer using a sync.WaitGroup, let me know. The new test makes sure that the mutex correctly protects the local storage; if we comment out the calls to Lock/Unlock and run the test with the `-race` flag we can see the race condition taking place <details> ``` # Without using the mutex we can see the race condition of messages sent in parallel $ go test -run=TestPackageUpdatesInParallel -v -race -count=1 === RUN TestPackageUpdatesInParallel ================== WARNING: DATA RACE Write at 0x00c00003cdb0 by goroutine 10: github.com/open-telemetry/opamp-go/client/internal.(*InMemPackagesStore).SetLastReportedStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/inmempackagestore.go:95 +0x34 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).reportStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:335 +0x78 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).syncPackage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:199 +0x4dc github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).doSync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:132 +0x3b0 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync.gowrap1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x4c Previous read at 0x00c00003cdb0 by goroutine 8: github.com/open-telemetry/opamp-go/client/internal.(*InMemPackagesStore).LastReportedStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/inmempackagestore.go:91 +0x30 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).initStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:88 +0x64 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:59 +0x78 github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:185 +0x68 github.com/open-telemetry/opamp-go/client/types.CallbacksStruct.OnMessage() /Users/tpaschalis/GitRepos/opamp-go/client/types/callbacks.go:161 +0x84 github.com/open-telemetry/opamp-go/client/types.(*CallbacksStruct).OnMessage() <autogenerated>:1 +0x20 github.com/open-telemetry/opamp-go/client/internal.(*receivedProcessor).ProcessReceivedMessage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/receivedprocessor.go:160 +0xe94 github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func2() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:197 +0x4d0 Goroutine 10 (running) created at: github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x15c github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:185 +0x68 github.com/open-telemetry/opamp-go/client/types.CallbacksStruct.OnMessage() /Users/tpaschalis/GitRepos/opamp-go/client/types/callbacks.go:161 +0x84 github.com/open-telemetry/opamp-go/client/types.(*CallbacksStruct).OnMessage() <autogenerated>:1 +0x20 github.com/open-telemetry/opamp-go/client/internal.(*receivedProcessor).ProcessReceivedMessage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/receivedprocessor.go:160 +0xe94 github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func3() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:216 +0x4d0 Goroutine 8 (finished) created at: github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:196 +0x4b0 testing.tRunner() /opt/homebrew/Cellar/go/1.22.2/libexec/src/testing/testing.go:1689 +0x180 testing.(*T).Run.gowrap1() /opt/homebrew/Cellar/go/1.22.2/libexec/src/testing/testing.go:1742 +0x40 ================== ================== WARNING: DATA RACE Write at 0x00c0000999e0 by goroutine 11: runtime.mapaccess2_faststr() /opt/homebrew/Cellar/go/1.22.2/libexec/src/runtime/map_faststr.go:108 +0x42c github.com/open-telemetry/opamp-go/client/internal.(*InMemPackagesStore).CreatePackage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/inmempackagestore.go:50 +0x74 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).syncPackage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:203 +0x528 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).doSync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:132 +0x3b0 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync.gowrap1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x4c Previous read at 0x00c0000999e0 by goroutine 10: runtime.mapdelete() /opt/homebrew/Cellar/go/1.22.2/libexec/src/runtime/map.go:696 +0x43c github.com/open-telemetry/opamp-go/client/internal.(*InMemPackagesStore).Packages() /Users/tpaschalis/GitRepos/opamp-go/client/internal/inmempackagestore.go:36 +0x64 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).deleteUnneededLocalPackages() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:303 +0x58 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).doSync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:125 +0x1b4 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync.gowrap1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x4c Goroutine 11 (running) created at: github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x15c github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:185 +0x68 github.com/open-telemetry/opamp-go/client/types.CallbacksStruct.OnMessage() /Users/tpaschalis/GitRepos/opamp-go/client/types/callbacks.go:161 +0x84 github.com/open-telemetry/opamp-go/client/types.(*CallbacksStruct).OnMessage() <autogenerated>:1 +0x20 github.com/open-telemetry/opamp-go/client/internal.(*receivedProcessor).ProcessReceivedMessage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/receivedprocessor.go:160 +0xe94 github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func2() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:197 +0x4d0 Goroutine 10 (running) created at: github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x15c github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:185 +0x68 github.com/open-telemetry/opamp-go/client/types.CallbacksStruct.OnMessage() /Users/tpaschalis/GitRepos/opamp-go/client/types/callbacks.go:161 +0x84 github.com/open-telemetry/opamp-go/client/types.(*CallbacksStruct).OnMessage() <autogenerated>:1 +0x20 github.com/open-telemetry/opamp-go/client/internal.(*receivedProcessor).ProcessReceivedMessage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/receivedprocessor.go:160 +0xe94 github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func3() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:216 +0x4d0 ================== ================== WARNING: DATA RACE Write at 0x00c00003cdb0 by goroutine 11: github.com/open-telemetry/opamp-go/client/internal.(*InMemPackagesStore).SetLastReportedStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/inmempackagestore.go:95 +0x34 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).reportStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:335 +0x78 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).syncPackage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:229 +0x780 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).doSync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:132 +0x3b0 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync.gowrap1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x4c Previous write at 0x00c00003cdb0 by goroutine 10: github.com/open-telemetry/opamp-go/client/internal.(*InMemPackagesStore).SetLastReportedStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/inmempackagestore.go:95 +0x34 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).reportStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:335 +0x78 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).syncPackage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:229 +0x780 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).doSync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:132 +0x3b0 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync.gowrap1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x4c Goroutine 11 (running) created at: github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x15c github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:185 +0x68 github.com/open-telemetry/opamp-go/client/types.CallbacksStruct.OnMessage() /Users/tpaschalis/GitRepos/opamp-go/client/types/callbacks.go:161 +0x84 github.com/open-telemetry/opamp-go/client/types.(*CallbacksStruct).OnMessage() <autogenerated>:1 +0x20 github.com/open-telemetry/opamp-go/client/internal.(*receivedProcessor).ProcessReceivedMessage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/receivedprocessor.go:160 +0xe94 github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func2() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:197 +0x4d0 Goroutine 10 (running) created at: github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x15c github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:185 +0x68 github.com/open-telemetry/opamp-go/client/types.CallbacksStruct.OnMessage() /Users/tpaschalis/GitRepos/opamp-go/client/types/callbacks.go:161 +0x84 github.com/open-telemetry/opamp-go/client/types.(*CallbacksStruct).OnMessage() <autogenerated>:1 +0x20 github.com/open-telemetry/opamp-go/client/internal.(*receivedProcessor).ProcessReceivedMessage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/receivedprocessor.go:160 +0xe94 github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func3() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:216 +0x4d0 ================== ================== WARNING: DATA RACE Write at 0x00c00003cdb0 by goroutine 10: github.com/open-telemetry/opamp-go/client/internal.(*InMemPackagesStore).SetLastReportedStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/inmempackagestore.go:95 +0x34 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).reportStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:335 +0x78 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).doSync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:151 +0x69c github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync.gowrap1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x4c Previous write at 0x00c00003cdb0 by goroutine 11: github.com/open-telemetry/opamp-go/client/internal.(*InMemPackagesStore).SetLastReportedStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/inmempackagestore.go:95 +0x34 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).reportStatuses() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:335 +0x78 github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).doSync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:151 +0x69c github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync.gowrap1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x4c Goroutine 10 (running) created at: github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x15c github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:185 +0x68 github.com/open-telemetry/opamp-go/client/types.CallbacksStruct.OnMessage() /Users/tpaschalis/GitRepos/opamp-go/client/types/callbacks.go:161 +0x84 github.com/open-telemetry/opamp-go/client/types.(*CallbacksStruct).OnMessage() <autogenerated>:1 +0x20 github.com/open-telemetry/opamp-go/client/internal.(*receivedProcessor).ProcessReceivedMessage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/receivedprocessor.go:160 +0xe94 github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func3() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:216 +0x4d0 Goroutine 11 (running) created at: github.com/open-telemetry/opamp-go/client/internal.(*packagesSyncer).Sync() /Users/tpaschalis/GitRepos/opamp-go/client/internal/packagessyncer.go:69 +0x15c github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func1() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:185 +0x68 github.com/open-telemetry/opamp-go/client/types.CallbacksStruct.OnMessage() /Users/tpaschalis/GitRepos/opamp-go/client/types/callbacks.go:161 +0x84 github.com/open-telemetry/opamp-go/client/types.(*CallbacksStruct).OnMessage() <autogenerated>:1 +0x20 github.com/open-telemetry/opamp-go/client/internal.(*receivedProcessor).ProcessReceivedMessage() /Users/tpaschalis/GitRepos/opamp-go/client/internal/receivedprocessor.go:160 +0xe94 github.com/open-telemetry/opamp-go/client/internal.TestPackageUpdatesInParallel.func2() /Users/tpaschalis/GitRepos/opamp-go/client/internal/httpsender_test.go:197 +0x4d0 ================== testing.go:1398: race detected during execution of test --- FAIL: TestPackageUpdatesInParallel (0.10s) FAIL exit status 1 FAIL github.com/open-telemetry/opamp-go/client/internal 0.286s ``` </details> Fixes #84
1 parent 182ce37 commit 536037b

File tree

10 files changed

+251
-7
lines changed

10 files changed

+251
-7
lines changed

client/httpclient.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ func (c *httpClient) runUntilStopped(ctx context.Context) {
131131
&c.common.ClientSyncedState,
132132
c.common.PackagesStateProvider,
133133
c.common.Capabilities,
134+
&c.common.PackageSyncMutex,
134135
)
135136
}
136137

client/internal/clientcommon.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ type ClientCommon struct {
4141
// PackagesStateProvider provides access to the local state of packages.
4242
PackagesStateProvider types.PackagesStateProvider
4343

44+
// PackageSyncMutex makes sure only one package syncing operation happens at a time.
45+
PackageSyncMutex sync.Mutex
46+
4447
// The transport-specific sender.
4548
sender Sender
4649

client/internal/httpsender.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"io"
1111
"net/http"
12+
"sync"
1213
"sync/atomic"
1314
"time"
1415

@@ -91,10 +92,11 @@ func (h *HTTPSender) Run(
9192
clientSyncedState *ClientSyncedState,
9293
packagesStateProvider types.PackagesStateProvider,
9394
capabilities protobufs.AgentCapabilities,
95+
packageSyncMutex *sync.Mutex,
9496
) {
9597
h.url = url
9698
h.callbacks = callbacks
97-
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities)
99+
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex)
98100

99101
for {
100102
pollingTimer := time.NewTimer(time.Millisecond * time.Duration(atomic.LoadInt64(&h.pollingIntervalMs)))

client/internal/httpsender_test.go

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
183183
clientSyncedState := &ClientSyncedState{}
184184
clientSyncedState.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
185185
capabilities := protobufs.AgentCapabilities_AgentCapabilities_Unspecified
186-
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities)
186+
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities, new(sync.Mutex))
187187

188188
// If we process a message with a nil AgentIdentification, or an incorrect NewInstanceUid.
189189
sender.receiveProcessor.ProcessReceivedMessage(ctx,
@@ -208,3 +208,127 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
208208
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified)
209209
cancel()
210210
}
211+
212+
func TestPackageUpdatesInParallel(t *testing.T) {
213+
ctx, cancel := context.WithCancel(context.Background())
214+
localPackageState := NewInMemPackagesStore()
215+
sender := NewHTTPSender(&sharedinternal.NopLogger{})
216+
blockSyncCh := make(chan struct{})
217+
doneCh := make([]<-chan struct{}, 0)
218+
219+
// Use `ch` to simulate blocking behavior on the second call to Sync().
220+
// This will allow both Sync() calls to be called in parallel; we will
221+
// first make sure that both are inflight before manually releasing the
222+
// channel so that both go through in sequence.
223+
localPackageState.onAllPackagesHash = func() {
224+
if localPackageState.lastReportedStatuses != nil {
225+
<-blockSyncCh
226+
}
227+
}
228+
229+
var messages atomic.Int32
230+
var mux sync.Mutex
231+
sender.callbacks = types.CallbacksStruct{
232+
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
233+
err := msg.PackageSyncer.Sync(ctx)
234+
assert.NoError(t, err)
235+
messages.Add(1)
236+
doneCh = append(doneCh, msg.PackageSyncer.Done())
237+
},
238+
}
239+
240+
clientSyncedState := &ClientSyncedState{}
241+
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages
242+
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, localPackageState, capabilities, &mux)
243+
244+
sender.receiveProcessor.ProcessReceivedMessage(ctx,
245+
&protobufs.ServerToAgent{
246+
PackagesAvailable: &protobufs.PackagesAvailable{
247+
Packages: map[string]*protobufs.PackageAvailable{
248+
"package1": {
249+
Type: protobufs.PackageType_PackageType_TopLevel,
250+
Version: "1.0.0",
251+
File: &protobufs.DownloadableFile{
252+
DownloadUrl: "foo",
253+
ContentHash: []byte{4, 5},
254+
},
255+
Hash: []byte{1, 2, 3},
256+
},
257+
},
258+
AllPackagesHash: []byte{1, 2, 3, 4, 5},
259+
},
260+
})
261+
sender.receiveProcessor.ProcessReceivedMessage(ctx,
262+
&protobufs.ServerToAgent{
263+
PackagesAvailable: &protobufs.PackagesAvailable{
264+
Packages: map[string]*protobufs.PackageAvailable{
265+
"package22": {
266+
Type: protobufs.PackageType_PackageType_TopLevel,
267+
Version: "1.0.0",
268+
File: &protobufs.DownloadableFile{
269+
DownloadUrl: "bar",
270+
ContentHash: []byte{4, 5},
271+
},
272+
Hash: []byte{1, 2, 3},
273+
},
274+
},
275+
AllPackagesHash: []byte{1, 2, 3, 4, 5},
276+
},
277+
})
278+
279+
// Make sure that both Sync calls have gone through _before_ releasing the first one.
280+
// This means that they're both called in parallel, and that the race
281+
// detector would always report a race condition, but proper locking makes
282+
// sure that's not the case.
283+
assert.Eventually(t, func() bool {
284+
return messages.Load() == 2
285+
}, 2*time.Second, 100*time.Millisecond, "both messages must have been processed successfully")
286+
287+
// Release the second Sync call so it can continue and wait for both of them to complete.
288+
blockSyncCh <- struct{}{}
289+
<-doneCh[0]
290+
<-doneCh[1]
291+
292+
cancel()
293+
}
294+
295+
func TestPackageUpdatesWithError(t *testing.T) {
296+
ctx, cancel := context.WithCancel(context.Background())
297+
sender := NewHTTPSender(&sharedinternal.NopLogger{})
298+
299+
// We'll pass in a nil PackageStateProvider to force the Sync call to return with an error.
300+
localPackageState := types.PackagesStateProvider(nil)
301+
var messages atomic.Int32
302+
var mux sync.Mutex
303+
sender.callbacks = types.CallbacksStruct{
304+
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
305+
// Make sure the call to Sync will return an error due to a nil PackageStateProvider
306+
err := msg.PackageSyncer.Sync(ctx)
307+
assert.Error(t, err)
308+
messages.Add(1)
309+
},
310+
}
311+
312+
clientSyncedState := &ClientSyncedState{}
313+
314+
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages
315+
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, localPackageState, capabilities, &mux)
316+
317+
// Send two messages in parallel.
318+
sender.receiveProcessor.ProcessReceivedMessage(ctx,
319+
&protobufs.ServerToAgent{
320+
PackagesAvailable: &protobufs.PackagesAvailable{},
321+
})
322+
sender.receiveProcessor.ProcessReceivedMessage(ctx,
323+
&protobufs.ServerToAgent{
324+
PackagesAvailable: &protobufs.PackagesAvailable{},
325+
})
326+
327+
// Make sure that even though the call to Sync errored out early, the lock
328+
// was still released properly for both messages to be processed.
329+
assert.Eventually(t, func() bool {
330+
return messages.Load() == 2
331+
}, 5*time.Second, 100*time.Millisecond, "both messages must have been processed successfully")
332+
333+
cancel()
334+
}

client/internal/inmempackagestore.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ type InMemPackagesStore struct {
1515
fileContents map[string][]byte
1616
fileHashes map[string][]byte
1717
lastReportedStatuses *protobufs.PackageStatuses
18+
19+
onAllPackagesHash func()
1820
}
1921

2022
var _ types.PackagesStateProvider = (*InMemPackagesStore)(nil)
@@ -28,6 +30,9 @@ func NewInMemPackagesStore() *InMemPackagesStore {
2830
}
2931

3032
func (l *InMemPackagesStore) AllPackagesHash() ([]byte, error) {
33+
if l.onAllPackagesHash != nil {
34+
l.onAllPackagesHash()
35+
}
3136
return l.allPackagesHash, nil
3237
}
3338

client/internal/packagessyncer.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"net/http"
9+
"sync"
910

1011
"github.com/open-telemetry/opamp-go/client/types"
1112
"github.com/open-telemetry/opamp-go/protobufs"
@@ -20,6 +21,7 @@ type packagesSyncer struct {
2021
sender Sender
2122

2223
statuses *protobufs.PackageStatuses
24+
mux *sync.Mutex
2325
doneCh chan struct{}
2426
}
2527

@@ -30,6 +32,7 @@ func NewPackagesSyncer(
3032
sender Sender,
3133
clientSyncedState *ClientSyncedState,
3234
packagesStateProvider types.PackagesStateProvider,
35+
mux *sync.Mutex,
3336
) *packagesSyncer {
3437
return &packagesSyncer{
3538
logger: logger,
@@ -38,6 +41,7 @@ func NewPackagesSyncer(
3841
clientSyncedState: clientSyncedState,
3942
localState: packagesStateProvider,
4043
doneCh: make(chan struct{}),
44+
mux: mux,
4145
}
4246
}
4347

@@ -49,15 +53,24 @@ func (s *packagesSyncer) Sync(ctx context.Context) error {
4953
}()
5054

5155
// Prepare package statuses.
56+
// Grab a lock to make sure that package statuses are not overriden by
57+
// another call to Sync running in parallel.
58+
// In case Sync returns early with an error, take care of unlocking the
59+
// mutex in this goroutine; otherwise it will be unlocked at the end
60+
// of the sync operation.
61+
s.mux.Lock()
5262
if err := s.initStatuses(); err != nil {
63+
s.mux.Unlock()
5364
return err
5465
}
5566

5667
if err := s.clientSyncedState.SetPackageStatuses(s.statuses); err != nil {
68+
s.mux.Unlock()
5769
return err
5870
}
5971

60-
// Now do the actual syncing in the background.
72+
// Now do the actual syncing in the background and release the lock from
73+
// inside of the goroutine.
6174
go s.doSync(ctx)
6275

6376
return nil
@@ -99,6 +112,10 @@ func (s *packagesSyncer) initStatuses() error {
99112

100113
// doSync performs the actual syncing process.
101114
func (s *packagesSyncer) doSync(ctx context.Context) {
115+
// Once doSync returns in a separate goroutine, make sure to release the
116+
// mutex so that a new syncing process can take place.
117+
defer s.mux.Unlock()
118+
102119
hash, err := s.localState.AllPackagesHash()
103120
if err != nil {
104121
s.logger.Errorf(ctx, "Package syncing failed: %V", err)

client/internal/receivedprocessor.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package internal
33
import (
44
"context"
55
"fmt"
6+
"sync"
67

78
"github.com/open-telemetry/opamp-go/client/types"
89
"github.com/open-telemetry/opamp-go/protobufs"
@@ -24,6 +25,9 @@ type receivedProcessor struct {
2425

2526
packagesStateProvider types.PackagesStateProvider
2627

28+
// packageSyncMutex protects against multiple package syncing operations at the same time.
29+
packageSyncMutex *sync.Mutex
30+
2731
// Agent's capabilities defined at Start() time.
2832
capabilities protobufs.AgentCapabilities
2933
}
@@ -35,6 +39,7 @@ func newReceivedProcessor(
3539
clientSyncedState *ClientSyncedState,
3640
packagesStateProvider types.PackagesStateProvider,
3741
capabilities protobufs.AgentCapabilities,
42+
packageSyncMutex *sync.Mutex,
3843
) receivedProcessor {
3944
return receivedProcessor{
4045
logger: logger,
@@ -43,6 +48,7 @@ func newReceivedProcessor(
4348
clientSyncedState: clientSyncedState,
4449
packagesStateProvider: packagesStateProvider,
4550
capabilities: capabilities,
51+
packageSyncMutex: packageSyncMutex,
4652
}
4753
}
4854

@@ -122,6 +128,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
122128
r.sender,
123129
r.clientSyncedState,
124130
r.packagesStateProvider,
131+
r.packageSyncMutex,
125132
)
126133
} else {
127134
r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability")

client/internal/wsreceiver.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package internal
33
import (
44
"context"
55
"fmt"
6+
"sync"
67

78
"github.com/gorilla/websocket"
89
"github.com/open-telemetry/opamp-go/client/types"
@@ -32,13 +33,14 @@ func NewWSReceiver(
3233
clientSyncedState *ClientSyncedState,
3334
packagesStateProvider types.PackagesStateProvider,
3435
capabilities protobufs.AgentCapabilities,
36+
packageSyncMutex *sync.Mutex,
3537
) *wsReceiver {
3638
w := &wsReceiver{
3739
conn: conn,
3840
logger: logger,
3941
sender: sender,
4042
callbacks: callbacks,
41-
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities),
43+
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex),
4244
stopped: make(chan struct{}),
4345
}
4446

0 commit comments

Comments
 (0)