Skip to content

Commit 715146e

Browse files
Stop fully when accepting OpAMP connection settings in the agent example (#184)
Fixes #178 I reverted #170 which was unnecessary. We don't really need to call Stop() from callbacks. In fact doing stopping on a separate goroutine is preferable, properly waiting until stopping is complete and then trying new connection settings.
1 parent 13a6fd1 commit 715146e

File tree

10 files changed

+42
-227
lines changed

10 files changed

+42
-227
lines changed

client/client.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,9 @@ type OpAMPClient interface {
3737
// May be called only once.
3838
// After this call returns successfully it is guaranteed that no
3939
// callbacks will be called. Stop() will cancel context of any in-fly
40-
// callbacks.
41-
//
42-
// If a callback is in progress (e.g. OnMessage is called but not finished)
43-
// Stop() initiates stopping and returns without waiting for stopping to finish.
44-
//
40+
// callbacks, but will wait until such in-fly callbacks are returned before
41+
// Stop returns, so make sure the callbacks don't block infinitely and react
42+
// promptly to context cancellations.
4543
// Once stopped OpAMPClient cannot be started again.
4644
Stop(ctx context.Context) error
4745

client/clientimpl_test.go

Lines changed: 0 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -483,85 +483,6 @@ func TestIncludesDetailsOnReconnect(t *testing.T) {
483483
assert.NoError(t, err)
484484
}
485485

486-
func TestStopFromCallback(t *testing.T) {
487-
// This test verifies calling Stop() from a callback. We had a bug previously
488-
// where Stop() would hang if called from a callback.
489-
490-
callbacksToTest := []string{"connect", "opamp", "message"}
491-
for _, callbackToTest := range callbacksToTest {
492-
t.Run(
493-
callbackToTest, func(t *testing.T) {
494-
495-
testClients(
496-
t, func(t *testing.T, client OpAMPClient) {
497-
var called int64
498-
499-
hash := []byte{1, 2, 3}
500-
opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"}
501-
502-
// Start a Server.
503-
srv := internal.StartMockServer(t)
504-
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
505-
if msg != nil {
506-
return &protobufs.ServerToAgent{
507-
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
508-
Hash: hash,
509-
Opamp: opampSettings,
510-
},
511-
}
512-
}
513-
return nil
514-
}
515-
516-
// Start a client.
517-
settings := types.StartSettings{
518-
Callbacks: types.CallbacksStruct{
519-
OnConnectFunc: func() {
520-
if callbackToTest == "connect" {
521-
client.Stop(context.Background())
522-
atomic.StoreInt64(&called, 1)
523-
}
524-
},
525-
OnOpampConnectionSettingsFunc: func(
526-
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
527-
) error {
528-
if callbackToTest == "opamp" {
529-
client.Stop(context.Background())
530-
atomic.StoreInt64(&called, 1)
531-
}
532-
return nil
533-
},
534-
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
535-
if callbackToTest == "message" {
536-
client.Stop(context.Background())
537-
atomic.StoreInt64(&called, 1)
538-
}
539-
},
540-
},
541-
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
542-
}
543-
settings.OpAMPServerURL = "ws://" + srv.Endpoint
544-
prepareClient(t, &settings, client)
545-
546-
assert.NoError(t, client.Start(context.Background(), settings))
547-
548-
eventually(
549-
t, func() bool {
550-
return atomic.LoadInt64(&called) == 1
551-
},
552-
)
553-
554-
// Shutdown the Server.
555-
srv.Close()
556-
557-
// Shutdown the client.
558-
err := client.Stop(context.Background())
559-
assert.NoError(t, err)
560-
})
561-
})
562-
}
563-
}
564-
565486
func createEffectiveConfig() *protobufs.EffectiveConfig {
566487
cfg := &protobufs.EffectiveConfig{
567488
ConfigMap: &protobufs.AgentConfigMap{

client/internal/clientcommon.go

Lines changed: 6 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"fmt"
77
"sync"
8-
"sync/atomic"
98

109
"google.golang.org/protobuf/proto"
1110

@@ -27,105 +26,11 @@ var (
2726
errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set")
2827
)
2928

30-
// CallbacksWrapper wraps Callbacks such that it is possible to query if any callback
31-
// function is in progress (called, but not yet returned). This is necessary for
32-
// safe handling of certain ClientCommon methods when they are called from the callbacks.
33-
// See for example Stop() implementation.
34-
type CallbacksWrapper struct {
35-
wrapped types.Callbacks
36-
// Greater than zero if currently processing a callback.
37-
inCallback *int64
38-
}
39-
40-
func (cc *CallbacksWrapper) OnConnect() {
41-
cc.EnterCallback()
42-
defer cc.LeaveCallback()
43-
cc.wrapped.OnConnect()
44-
}
45-
46-
func (cc *CallbacksWrapper) OnConnectFailed(err error) {
47-
cc.EnterCallback()
48-
defer cc.LeaveCallback()
49-
cc.wrapped.OnConnectFailed(err)
50-
}
51-
52-
func (cc *CallbacksWrapper) OnError(err *protobufs.ServerErrorResponse) {
53-
cc.EnterCallback()
54-
defer cc.LeaveCallback()
55-
cc.wrapped.OnError(err)
56-
}
57-
58-
func (cc *CallbacksWrapper) OnMessage(ctx context.Context, msg *types.MessageData) {
59-
cc.EnterCallback()
60-
defer cc.LeaveCallback()
61-
cc.wrapped.OnMessage(ctx, msg)
62-
}
63-
64-
func (cc *CallbacksWrapper) OnOpampConnectionSettings(
65-
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
66-
) error {
67-
cc.EnterCallback()
68-
defer cc.LeaveCallback()
69-
return cc.wrapped.OnOpampConnectionSettings(ctx, settings)
70-
}
71-
72-
func (cc *CallbacksWrapper) OnOpampConnectionSettingsAccepted(settings *protobufs.OpAMPConnectionSettings) {
73-
cc.EnterCallback()
74-
defer cc.LeaveCallback()
75-
cc.wrapped.OnOpampConnectionSettingsAccepted(settings)
76-
}
77-
78-
func (cc *CallbacksWrapper) SaveRemoteConfigStatus(ctx context.Context, status *protobufs.RemoteConfigStatus) {
79-
cc.EnterCallback()
80-
defer cc.LeaveCallback()
81-
cc.wrapped.SaveRemoteConfigStatus(ctx, status)
82-
}
83-
84-
func (cc *CallbacksWrapper) GetEffectiveConfig(ctx context.Context) (*protobufs.EffectiveConfig, error) {
85-
cc.EnterCallback()
86-
defer cc.LeaveCallback()
87-
return cc.wrapped.GetEffectiveConfig(ctx)
88-
}
89-
90-
func (cc *CallbacksWrapper) OnCommand(command *protobufs.ServerToAgentCommand) error {
91-
cc.EnterCallback()
92-
defer cc.LeaveCallback()
93-
return cc.wrapped.OnCommand(command)
94-
}
95-
96-
var _ types.Callbacks = (*CallbacksWrapper)(nil)
97-
98-
func NewCallbacksWrapper(wrapped types.Callbacks) *CallbacksWrapper {
99-
zero := int64(0)
100-
101-
if wrapped == nil {
102-
// Make sure it is always safe to call Callbacks.
103-
wrapped = types.CallbacksStruct{}
104-
}
105-
106-
return &CallbacksWrapper{
107-
wrapped: wrapped,
108-
inCallback: &zero,
109-
}
110-
}
111-
112-
func (cc *CallbacksWrapper) EnterCallback() {
113-
atomic.AddInt64(cc.inCallback, 1)
114-
}
115-
116-
func (cc *CallbacksWrapper) LeaveCallback() {
117-
atomic.AddInt64(cc.inCallback, -1)
118-
}
119-
120-
func (cc *CallbacksWrapper) InCallback() bool {
121-
return atomic.LoadInt64(cc.inCallback) != 0
122-
}
123-
12429
// ClientCommon contains the OpAMP logic that is common between WebSocket and
12530
// plain HTTP transports.
12631
type ClientCommon struct {
12732
Logger types.Logger
128-
Callbacks *CallbacksWrapper
33+
Callbacks types.Callbacks
12934

13035
// Agent's capabilities defined at Start() time.
13136
Capabilities protobufs.AgentCapabilities
@@ -225,7 +130,11 @@ func (c *ClientCommon) PrepareStart(
225130
}
226131

227132
// Prepare callbacks.
228-
c.Callbacks = NewCallbacksWrapper(settings.Callbacks)
133+
c.Callbacks = settings.Callbacks
134+
if c.Callbacks == nil {
135+
// Make sure it is always safe to call Callbacks.
136+
c.Callbacks = types.CallbacksStruct{}
137+
}
229138

230139
if err := c.sender.SetInstanceUid(settings.InstanceUid); err != nil {
231140
return err
@@ -247,16 +156,6 @@ func (c *ClientCommon) Stop(ctx context.Context) error {
247156

248157
cancelFunc()
249158

250-
if c.Callbacks.InCallback() {
251-
// Stop() is called from a callback. We cannot wait and block here
252-
// because the c.stoppedSignal may not be set until the callback
253-
// returns. This is the case for example when OnMessage callback is
254-
// called. So, for this case we return immediately and the caller
255-
// needs to be aware that Stop() does not wait for stopping to
256-
// finish in this case.
257-
return nil
258-
}
259-
260159
// Wait until stopping is finished.
261160
select {
262161
case <-ctx.Done():

client/internal/httpsender.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@ import (
1313
"time"
1414

1515
"github.com/cenkalti/backoff/v4"
16-
"google.golang.org/protobuf/proto"
17-
1816
"github.com/open-telemetry/opamp-go/internal"
17+
"google.golang.org/protobuf/proto"
1918

2019
"github.com/open-telemetry/opamp-go/client/types"
2120
"github.com/open-telemetry/opamp-go/protobufs"
@@ -37,7 +36,7 @@ type HTTPSender struct {
3736
url string
3837
logger types.Logger
3938
client *http.Client
40-
callbacks *CallbacksWrapper
39+
callbacks types.Callbacks
4140
pollingIntervalMs int64
4241
compressionEnabled bool
4342

@@ -71,7 +70,7 @@ func NewHTTPSender(logger types.Logger) *HTTPSender {
7170
func (h *HTTPSender) Run(
7271
ctx context.Context,
7372
url string,
74-
callbacks *CallbacksWrapper,
73+
callbacks types.Callbacks,
7574
clientSyncedState *ClientSyncedState,
7675
packagesStateProvider types.PackagesStateProvider,
7776
capabilities protobufs.AgentCapabilities,

client/internal/httpsender_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ import (
88
"testing"
99
"time"
1010

11-
"github.com/stretchr/testify/assert"
12-
1311
"github.com/open-telemetry/opamp-go/client/types"
1412
sharedinternal "github.com/open-telemetry/opamp-go/internal"
1513
"github.com/open-telemetry/opamp-go/protobufs"
14+
"github.com/stretchr/testify/assert"
1615
)
1716

1817
func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {
@@ -42,12 +41,12 @@ func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {
4241
}},
4342
}
4443
})
45-
sender.callbacks = NewCallbacksWrapper(types.CallbacksStruct{
44+
sender.callbacks = types.CallbacksStruct{
4645
OnConnectFunc: func() {
4746
},
4847
OnConnectFailedFunc: func(_ error) {
4948
},
50-
})
49+
}
5150
sender.url = url
5251
start := time.Now()
5352
resp, err := sender.sendRequestWithRetries(ctx)

client/internal/receivedprocessor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type receivedProcessor struct {
1313
logger types.Logger
1414

1515
// Callbacks to call for corresponding messages.
16-
callbacks *CallbacksWrapper
16+
callbacks types.Callbacks
1717

1818
// A sender to cooperate with when the received message has an impact on
1919
// what will be sent later.
@@ -30,7 +30,7 @@ type receivedProcessor struct {
3030

3131
func newReceivedProcessor(
3232
logger types.Logger,
33-
callbacks *CallbacksWrapper,
33+
callbacks types.Callbacks,
3434
sender Sender,
3535
clientSyncedState *ClientSyncedState,
3636
packagesStateProvider types.PackagesStateProvider,

client/internal/wsreceiver.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66

77
"github.com/gorilla/websocket"
8-
98
"github.com/open-telemetry/opamp-go/client/types"
109
"github.com/open-telemetry/opamp-go/internal"
1110
"github.com/open-telemetry/opamp-go/protobufs"
@@ -16,15 +15,15 @@ type wsReceiver struct {
1615
conn *websocket.Conn
1716
logger types.Logger
1817
sender *WSSender
19-
callbacks *CallbacksWrapper
18+
callbacks types.Callbacks
2019
processor receivedProcessor
2120
}
2221

2322
// NewWSReceiver creates a new Receiver that uses WebSocket to receive
2423
// messages from the server.
2524
func NewWSReceiver(
2625
logger types.Logger,
27-
callbacks *CallbacksWrapper,
26+
callbacks types.Callbacks,
2827
conn *websocket.Conn,
2928
sender *WSSender,
3029
clientSyncedState *ClientSyncedState,

client/internal/wsreceiver_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestServerToAgentCommand(t *testing.T) {
7777
remoteConfigStatus: &protobufs.RemoteConfigStatus{},
7878
}
7979
sender := WSSender{}
80-
receiver := NewWSReceiver(TestLogger{t}, NewCallbacksWrapper(callbacks), nil, &sender, &clientSyncedState, nil, 0)
80+
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, 0)
8181
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
8282
Command: test.command,
8383
})
@@ -100,7 +100,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) {
100100
},
101101
}
102102
clientSyncedState := ClientSyncedState{}
103-
receiver := NewWSReceiver(TestLogger{t}, NewCallbacksWrapper(callbacks), nil, nil, &clientSyncedState, nil, 0)
103+
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, 0)
104104
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
105105
Command: &protobufs.ServerToAgentCommand{
106106
Type: protobufs.CommandType_CommandType_Restart,

client/wsclient.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh
123123
var resp *http.Response
124124
conn, resp, err := c.dialer.DialContext(ctx, c.url.String(), c.requestHeader)
125125
if err != nil {
126-
if !c.common.IsStopping() {
126+
if c.common.Callbacks != nil && !c.common.IsStopping() {
127127
c.common.Callbacks.OnConnectFailed(err)
128128
}
129129
if resp != nil {
@@ -138,7 +138,9 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh
138138
c.connMutex.Lock()
139139
c.conn = conn
140140
c.connMutex.Unlock()
141-
c.common.Callbacks.OnConnect()
141+
if c.common.Callbacks != nil {
142+
c.common.Callbacks.OnConnect()
143+
}
142144

143145
return nil, sharedinternal.OptionalDuration{Defined: false}
144146
}
@@ -191,10 +193,9 @@ func (c *wsClient) ensureConnected(ctx context.Context) error {
191193
}
192194

193195
// runOneCycle performs the following actions:
194-
// 1. connect (try until succeeds).
195-
// 2. send first status report.
196-
// 3. receive and process messages until error happens.
197-
//
196+
// 1. connect (try until succeeds).
197+
// 2. send first status report.
198+
// 3. receive and process messages until error happens.
198199
// If it encounters an error it closes the connection and returns.
199200
// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set).
200201
func (c *wsClient) runOneCycle(ctx context.Context) {

0 commit comments

Comments
 (0)