From 386cf669b8944c189bcc3a91bf8f1349552c25f9 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Wed, 23 Oct 2024 16:34:44 +0800 Subject: [PATCH 1/3] Allow setting initial heartbeat interval --- client/httpclient_test.go | 79 +++++++++++++++++++++++++++++++++ client/internal/clientcommon.go | 7 +++ client/types/startsettings.go | 10 +++++ client/wsclient_test.go | 56 +++++++++++++++++++++++ 4 files changed, 152 insertions(+) diff --git a/client/httpclient_test.go b/client/httpclient_test.go index 6e9cdd31..9fbb41c4 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -145,3 +145,82 @@ func TestHTTPClientSetPollingInterval(t *testing.T) { err := client.Stop(context.Background()) assert.NoError(t, err) } + +func TestHTTPClientStartWithHeartbeatInterval(t *testing.T) { + tests := []struct { + name string + enableHeartbeat bool + expectHeartbeats bool + }{ + {"client enable heartbeat", true, true}, + {"client disable heartbeat", false, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Start a Server. + srv := internal.StartMockServer(t) + var rcvCounter int64 + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + if msg == nil { + t.Error("unexpected nil msg") + return nil + } + assert.EqualValues(t, rcvCounter, msg.SequenceNum) + atomic.AddInt64(&rcvCounter, 1) + return nil + } + + // Start a client. + heartbeatSec := 1 + settings := types.StartSettings{ + OpAMPServerURL: "http://" + srv.Endpoint, + HeartbeatIntervalSecond: &heartbeatSec, + } + if tt.enableHeartbeat { + settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat + } + client := NewHTTP(nil) + prepareClient(t, &settings, client) + + assert.NoError(t, client.Start(context.Background(), settings)) + + // Verify that status report is delivered. + eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) == 1 }) + + if tt.expectHeartbeats { + // Verify that status report is delivered again. no call is made for next 100ms + assert.Eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) == 2 }, 5*time.Second, 100*time.Millisecond) + } else { + assert.Never(t, func() bool { return atomic.LoadInt64(&rcvCounter) == 2 }, 5*time.Second, 100*time.Millisecond) + } + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + } +} + +func TestHTTPClientStartWithZeroHeartbeatInterval(t *testing.T) { + srv := internal.StartMockServer(t) + + // Start a client. + heartbeat := 0 + settings := types.StartSettings{ + OpAMPServerURL: "http://" + srv.Endpoint, + HeartbeatIntervalSecond: &heartbeat, + Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat, + } + client := NewHTTP(nil) + prepareClient(t, &settings, client) + + // Zero heartbeat interval is invalid for http client. + assert.Error(t, client.Start(context.Background(), settings)) + + // Shutdown the Server. + srv.Close() +} diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index b2f87f5d..2024d926 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "google.golang.org/protobuf/proto" @@ -137,6 +138,12 @@ func (c *ClientCommon) PrepareStart( c.Callbacks = types.CallbacksStruct{} } + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat != 0 && settings.HeartbeatIntervalSecond != nil { + if err := c.sender.SetHeartbeatInterval(time.Duration(*settings.HeartbeatIntervalSecond) * time.Second); err != nil { + return err + } + } + if err := c.sender.SetInstanceUid(settings.InstanceUid); err != nil { return err } diff --git a/client/types/startsettings.go b/client/types/startsettings.go index b968c7ff..f35372c9 100644 --- a/client/types/startsettings.go +++ b/client/types/startsettings.go @@ -53,4 +53,14 @@ type StartSettings struct { // the compression is only effectively enabled if the Server also supports compression. // The data will be compressed in both directions. EnableCompression bool + + // Optional HeartbeatIntervalSecond to configure the heartbeat interval for client. + // If nil, the default heartbeat interval (30s) will be used. + // If zero, heartbeat will be disabled for a Websocket-based client. + // + // Note that an HTTP-based client will use the heartbeat interval as its polling interval + // and zero is invalid for an HTTP-based client. + // + // If the ReportsHeartbeat capability is disabled, this option has no effect. + HeartbeatIntervalSecond *int } diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 76053ab6..9e9d49bc 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -93,6 +93,62 @@ func TestWSSenderReportsHeartbeat(t *testing.T) { } } +func TestWSClientStartWithHeartbeatInterval(t *testing.T) { + tests := []struct { + name string + clientEnableHeartbeat bool + expectHeartbeats bool + }{ + {"client enable heartbeat", true, true}, + {"client disable heartbeat", false, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := internal.StartMockServer(t) + + var conn atomic.Value + srv.OnWSConnect = func(c *websocket.Conn) { + conn.Store(c) + } + var msgCount atomic.Int64 + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + msgCount.Add(1) + return nil + } + + // Start an OpAMP/WebSocket client. + heartbeatSec := 1 + settings := types.StartSettings{ + OpAMPServerURL: "ws://" + srv.Endpoint, + HeartbeatIntervalSecond: &heartbeatSec, + } + if tt.clientEnableHeartbeat { + settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat + } + client := NewWebSocket(nil) + startClient(t, settings, client) + + // Wait for connection to be established. + eventually(t, func() bool { return conn.Load() != nil }) + + if tt.expectHeartbeats { + assert.Eventually(t, func() bool { + return msgCount.Load() >= 2 + }, 3*time.Second, 100*time.Millisecond) + } else { + assert.Never(t, func() bool { + return msgCount.Load() >= 2 + }, 3*time.Second, 100*time.Millisecond) + } + + // Stop the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + } +} + func TestDisconnectWSByServer(t *testing.T) { // Start a Server. srv := internal.StartMockServer(t) From 58b5e732cbf6b4cd803c06173cbe6d457a4a1214 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Thu, 24 Oct 2024 00:29:46 +0800 Subject: [PATCH 2/3] update wssender to support ms granularity and use `time.Duration` for the option --- client/httpclient_test.go | 20 ++++++++++---------- client/internal/clientcommon.go | 5 ++--- client/internal/wssender.go | 12 ++++++------ client/internal/wssender_test.go | 12 ++++++------ client/types/startsettings.go | 5 +++-- client/wsclient_test.go | 10 +++++----- 6 files changed, 32 insertions(+), 32 deletions(-) diff --git a/client/httpclient_test.go b/client/httpclient_test.go index 9fbb41c4..bde616a4 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -172,10 +172,10 @@ func TestHTTPClientStartWithHeartbeatInterval(t *testing.T) { } // Start a client. - heartbeatSec := 1 + heartbeat := 10 * time.Millisecond settings := types.StartSettings{ - OpAMPServerURL: "http://" + srv.Endpoint, - HeartbeatIntervalSecond: &heartbeatSec, + OpAMPServerURL: "http://" + srv.Endpoint, + HeartbeatInterval: &heartbeat, } if tt.enableHeartbeat { settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat @@ -189,10 +189,10 @@ func TestHTTPClientStartWithHeartbeatInterval(t *testing.T) { eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) == 1 }) if tt.expectHeartbeats { - // Verify that status report is delivered again. no call is made for next 100ms - assert.Eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) == 2 }, 5*time.Second, 100*time.Millisecond) + // Verify that status report is delivered again. no call is made for next 10ms + assert.Eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) >= 2 }, 50*time.Second, 10*time.Millisecond) } else { - assert.Never(t, func() bool { return atomic.LoadInt64(&rcvCounter) == 2 }, 5*time.Second, 100*time.Millisecond) + assert.Never(t, func() bool { return atomic.LoadInt64(&rcvCounter) >= 2 }, 50*time.Millisecond, 10*time.Millisecond) } // Shutdown the Server. @@ -209,11 +209,11 @@ func TestHTTPClientStartWithZeroHeartbeatInterval(t *testing.T) { srv := internal.StartMockServer(t) // Start a client. - heartbeat := 0 + heartbeat := 0 * time.Millisecond settings := types.StartSettings{ - OpAMPServerURL: "http://" + srv.Endpoint, - HeartbeatIntervalSecond: &heartbeat, - Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat, + OpAMPServerURL: "http://" + srv.Endpoint, + HeartbeatInterval: &heartbeat, + Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat, } client := NewHTTP(nil) prepareClient(t, &settings, client) diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 2024d926..5369cf15 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "sync" - "time" "google.golang.org/protobuf/proto" @@ -138,8 +137,8 @@ func (c *ClientCommon) PrepareStart( c.Callbacks = types.CallbacksStruct{} } - if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat != 0 && settings.HeartbeatIntervalSecond != nil { - if err := c.sender.SetHeartbeatInterval(time.Duration(*settings.HeartbeatIntervalSecond) * time.Second); err != nil { + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat != 0 && settings.HeartbeatInterval != nil { + if err := c.sender.SetHeartbeatInterval(*settings.HeartbeatInterval); err != nil { return err } } diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 19216b5a..16b5014a 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -15,8 +15,8 @@ import ( ) const ( - defaultSendCloseMessageTimeout = 5 * time.Second - defaultHeartbeatIntervalSeconds = 30 + defaultSendCloseMessageTimeout = 5 * time.Second + defaultHeartbeatIntervalMs = 30 * 1000 ) // WSSender implements the WebSocket client's sending portion of OpAMP protocol. @@ -30,7 +30,7 @@ type WSSender struct { err error heartbeatIntervalUpdated chan struct{} - heartbeatIntervalSeconds atomic.Int64 + heartbeatIntervalMs atomic.Int64 heartbeatTimer *time.Timer } @@ -43,7 +43,7 @@ func NewSender(logger types.Logger) *WSSender { heartbeatTimer: time.NewTimer(0), SenderCommon: NewSenderCommon(), } - s.heartbeatIntervalSeconds.Store(defaultHeartbeatIntervalSeconds) + s.heartbeatIntervalMs.Store(defaultHeartbeatIntervalMs) return s } @@ -80,7 +80,7 @@ func (s *WSSender) SetHeartbeatInterval(d time.Duration) error { return errors.New("heartbeat interval for wsclient must be non-negative") } - s.heartbeatIntervalSeconds.Store(int64(d.Seconds())) + s.heartbeatIntervalMs.Store(int64(d.Milliseconds())) select { case s.heartbeatIntervalUpdated <- struct{}{}: default: @@ -101,7 +101,7 @@ func (s *WSSender) shouldSendHeartbeat() <-chan time.Time { } } - if d := time.Duration(s.heartbeatIntervalSeconds.Load()) * time.Second; d != 0 { + if d := time.Duration(s.heartbeatIntervalMs.Load()) * time.Millisecond; d != 0 { t.Reset(d) return t.C } diff --git a/client/internal/wssender_test.go b/client/internal/wssender_test.go index 29b8a218..c3c58d62 100644 --- a/client/internal/wssender_test.go +++ b/client/internal/wssender_test.go @@ -12,17 +12,17 @@ func TestWSSenderSetHeartbeatInterval(t *testing.T) { sender := NewSender(&sharedinternal.NopLogger{}) // Default interval should be 30s as per OpAMP Specification - assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load()) + assert.Equal(t, int64((30 * time.Second).Milliseconds()), sender.heartbeatIntervalMs.Load()) // negative interval is invalid for http sender assert.Error(t, sender.SetHeartbeatInterval(-1)) - assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load()) + assert.Equal(t, int64((30 * time.Second).Milliseconds()), sender.heartbeatIntervalMs.Load()) // zero is valid for ws sender assert.NoError(t, sender.SetHeartbeatInterval(0)) - assert.Equal(t, int64(0), sender.heartbeatIntervalSeconds.Load()) + assert.Equal(t, int64(0), sender.heartbeatIntervalMs.Load()) - var expected int64 = 10 - assert.NoError(t, sender.SetHeartbeatInterval(time.Duration(expected)*time.Second)) - assert.Equal(t, expected, sender.heartbeatIntervalSeconds.Load()) + var expected int64 = 10000 + assert.NoError(t, sender.SetHeartbeatInterval(time.Duration(expected)*time.Millisecond)) + assert.Equal(t, expected, sender.heartbeatIntervalMs.Load()) } diff --git a/client/types/startsettings.go b/client/types/startsettings.go index f35372c9..6184d575 100644 --- a/client/types/startsettings.go +++ b/client/types/startsettings.go @@ -3,6 +3,7 @@ package types import ( "crypto/tls" "net/http" + "time" "github.com/open-telemetry/opamp-go/protobufs" ) @@ -54,7 +55,7 @@ type StartSettings struct { // The data will be compressed in both directions. EnableCompression bool - // Optional HeartbeatIntervalSecond to configure the heartbeat interval for client. + // Optional HeartbeatInterval to configure the heartbeat interval for client. // If nil, the default heartbeat interval (30s) will be used. // If zero, heartbeat will be disabled for a Websocket-based client. // @@ -62,5 +63,5 @@ type StartSettings struct { // and zero is invalid for an HTTP-based client. // // If the ReportsHeartbeat capability is disabled, this option has no effect. - HeartbeatIntervalSecond *int + HeartbeatInterval *time.Duration } diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 9e9d49bc..b9a18682 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -118,10 +118,10 @@ func TestWSClientStartWithHeartbeatInterval(t *testing.T) { } // Start an OpAMP/WebSocket client. - heartbeatSec := 1 + heartbeat := 10 * time.Millisecond settings := types.StartSettings{ - OpAMPServerURL: "ws://" + srv.Endpoint, - HeartbeatIntervalSecond: &heartbeatSec, + OpAMPServerURL: "ws://" + srv.Endpoint, + HeartbeatInterval: &heartbeat, } if tt.clientEnableHeartbeat { settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat @@ -135,11 +135,11 @@ func TestWSClientStartWithHeartbeatInterval(t *testing.T) { if tt.expectHeartbeats { assert.Eventually(t, func() bool { return msgCount.Load() >= 2 - }, 3*time.Second, 100*time.Millisecond) + }, 50*time.Millisecond, 10*time.Millisecond) } else { assert.Never(t, func() bool { return msgCount.Load() >= 2 - }, 3*time.Second, 100*time.Millisecond) + }, 50*time.Millisecond, 10*time.Millisecond) } // Stop the client. From 74a288058acd017e8186ee5db9426812148eb23d Mon Sep 17 00:00:00 2001 From: haoqixu Date: Thu, 24 Oct 2024 11:40:23 +0800 Subject: [PATCH 3/3] update the waiting timeout in tests --- client/httpclient_test.go | 3 +-- client/wsclient_test.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/client/httpclient_test.go b/client/httpclient_test.go index bde616a4..a3845c45 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -189,8 +189,7 @@ func TestHTTPClientStartWithHeartbeatInterval(t *testing.T) { eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) == 1 }) if tt.expectHeartbeats { - // Verify that status report is delivered again. no call is made for next 10ms - assert.Eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) >= 2 }, 50*time.Second, 10*time.Millisecond) + assert.Eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) >= 2 }, 5*time.Second, 10*time.Millisecond) } else { assert.Never(t, func() bool { return atomic.LoadInt64(&rcvCounter) >= 2 }, 50*time.Millisecond, 10*time.Millisecond) } diff --git a/client/wsclient_test.go b/client/wsclient_test.go index b9a18682..a8322a9f 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -135,7 +135,7 @@ func TestWSClientStartWithHeartbeatInterval(t *testing.T) { if tt.expectHeartbeats { assert.Eventually(t, func() bool { return msgCount.Load() >= 2 - }, 50*time.Millisecond, 10*time.Millisecond) + }, 5*time.Second, 10*time.Millisecond) } else { assert.Never(t, func() bool { return msgCount.Load() >= 2