Skip to content

Commit

Permalink
Allow setting initial heartbeat interval (#305)
Browse files Browse the repository at this point in the history
Add a `HeartbeatIntervalSecond` option to `StartSettings` to allow setting initial heartbeat interval.

Resolves #303
  • Loading branch information
haoqixu authored Oct 28, 2024
1 parent d11f451 commit 36a1afd
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 12 deletions.
78 changes: 78 additions & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,81 @@ func TestHTTPClientSetPollingInterval(t *testing.T) {
err := client.Stop(context.Background())
assert.NoError(t, err)
}

func TestHTTPClientStartWithHeartbeatInterval(t *testing.T) {
tests := []struct {
name string
enableHeartbeat bool
expectHeartbeats bool
}{
{"client enable heartbeat", true, true},
{"client disable heartbeat", false, false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
var rcvCounter int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg == nil {
t.Error("unexpected nil msg")
return nil
}
assert.EqualValues(t, rcvCounter, msg.SequenceNum)
atomic.AddInt64(&rcvCounter, 1)
return nil
}

// Start a client.
heartbeat := 10 * time.Millisecond
settings := types.StartSettings{
OpAMPServerURL: "http://" + srv.Endpoint,
HeartbeatInterval: &heartbeat,
}
if tt.enableHeartbeat {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat
}
client := NewHTTP(nil)
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// Verify that status report is delivered.
eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) == 1 })

if tt.expectHeartbeats {
assert.Eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) >= 2 }, 5*time.Second, 10*time.Millisecond)
} else {
assert.Never(t, func() bool { return atomic.LoadInt64(&rcvCounter) >= 2 }, 50*time.Millisecond, 10*time.Millisecond)
}

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}

func TestHTTPClientStartWithZeroHeartbeatInterval(t *testing.T) {
srv := internal.StartMockServer(t)

// Start a client.
heartbeat := 0 * time.Millisecond
settings := types.StartSettings{
OpAMPServerURL: "http://" + srv.Endpoint,
HeartbeatInterval: &heartbeat,
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat,
}
client := NewHTTP(nil)
prepareClient(t, &settings, client)

// Zero heartbeat interval is invalid for http client.
assert.Error(t, client.Start(context.Background(), settings))

// Shutdown the Server.
srv.Close()
}
6 changes: 6 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (c *ClientCommon) PrepareStart(
c.Callbacks = types.CallbacksStruct{}
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat != 0 && settings.HeartbeatInterval != nil {
if err := c.sender.SetHeartbeatInterval(*settings.HeartbeatInterval); err != nil {
return err
}
}

if err := c.sender.SetInstanceUid(settings.InstanceUid); err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions client/internal/wssender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

const (
defaultSendCloseMessageTimeout = 5 * time.Second
defaultHeartbeatIntervalSeconds = 30
defaultSendCloseMessageTimeout = 5 * time.Second
defaultHeartbeatIntervalMs = 30 * 1000
)

// WSSender implements the WebSocket client's sending portion of OpAMP protocol.
Expand All @@ -30,7 +30,7 @@ type WSSender struct {
err error

heartbeatIntervalUpdated chan struct{}
heartbeatIntervalSeconds atomic.Int64
heartbeatIntervalMs atomic.Int64
heartbeatTimer *time.Timer
}

Expand All @@ -43,7 +43,7 @@ func NewSender(logger types.Logger) *WSSender {
heartbeatTimer: time.NewTimer(0),
SenderCommon: NewSenderCommon(),
}
s.heartbeatIntervalSeconds.Store(defaultHeartbeatIntervalSeconds)
s.heartbeatIntervalMs.Store(defaultHeartbeatIntervalMs)

return s
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func (s *WSSender) SetHeartbeatInterval(d time.Duration) error {
return errors.New("heartbeat interval for wsclient must be non-negative")
}

s.heartbeatIntervalSeconds.Store(int64(d.Seconds()))
s.heartbeatIntervalMs.Store(int64(d.Milliseconds()))
select {
case s.heartbeatIntervalUpdated <- struct{}{}:
default:
Expand All @@ -101,7 +101,7 @@ func (s *WSSender) shouldSendHeartbeat() <-chan time.Time {
}
}

if d := time.Duration(s.heartbeatIntervalSeconds.Load()) * time.Second; d != 0 {
if d := time.Duration(s.heartbeatIntervalMs.Load()) * time.Millisecond; d != 0 {
t.Reset(d)
return t.C
}
Expand Down
12 changes: 6 additions & 6 deletions client/internal/wssender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ func TestWSSenderSetHeartbeatInterval(t *testing.T) {
sender := NewSender(&sharedinternal.NopLogger{})

// Default interval should be 30s as per OpAMP Specification
assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load())
assert.Equal(t, int64((30 * time.Second).Milliseconds()), sender.heartbeatIntervalMs.Load())

// negative interval is invalid for http sender
assert.Error(t, sender.SetHeartbeatInterval(-1))
assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load())
assert.Equal(t, int64((30 * time.Second).Milliseconds()), sender.heartbeatIntervalMs.Load())

// zero is valid for ws sender
assert.NoError(t, sender.SetHeartbeatInterval(0))
assert.Equal(t, int64(0), sender.heartbeatIntervalSeconds.Load())
assert.Equal(t, int64(0), sender.heartbeatIntervalMs.Load())

var expected int64 = 10
assert.NoError(t, sender.SetHeartbeatInterval(time.Duration(expected)*time.Second))
assert.Equal(t, expected, sender.heartbeatIntervalSeconds.Load())
var expected int64 = 10000
assert.NoError(t, sender.SetHeartbeatInterval(time.Duration(expected)*time.Millisecond))
assert.Equal(t, expected, sender.heartbeatIntervalMs.Load())
}
11 changes: 11 additions & 0 deletions client/types/startsettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"crypto/tls"
"net/http"
"time"

"github.com/open-telemetry/opamp-go/protobufs"
)
Expand Down Expand Up @@ -53,4 +54,14 @@ type StartSettings struct {
// the compression is only effectively enabled if the Server also supports compression.
// The data will be compressed in both directions.
EnableCompression bool

// Optional HeartbeatInterval to configure the heartbeat interval for client.
// If nil, the default heartbeat interval (30s) will be used.
// If zero, heartbeat will be disabled for a Websocket-based client.
//
// Note that an HTTP-based client will use the heartbeat interval as its polling interval
// and zero is invalid for an HTTP-based client.
//
// If the ReportsHeartbeat capability is disabled, this option has no effect.
HeartbeatInterval *time.Duration
}
56 changes: 56 additions & 0 deletions client/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,62 @@ func TestWSSenderReportsHeartbeat(t *testing.T) {
}
}

func TestWSClientStartWithHeartbeatInterval(t *testing.T) {
tests := []struct {
name string
clientEnableHeartbeat bool
expectHeartbeats bool
}{
{"client enable heartbeat", true, true},
{"client disable heartbeat", false, false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := internal.StartMockServer(t)

var conn atomic.Value
srv.OnWSConnect = func(c *websocket.Conn) {
conn.Store(c)
}
var msgCount atomic.Int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
msgCount.Add(1)
return nil
}

// Start an OpAMP/WebSocket client.
heartbeat := 10 * time.Millisecond
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
HeartbeatInterval: &heartbeat,
}
if tt.clientEnableHeartbeat {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat
}
client := NewWebSocket(nil)
startClient(t, settings, client)

// Wait for connection to be established.
eventually(t, func() bool { return conn.Load() != nil })

if tt.expectHeartbeats {
assert.Eventually(t, func() bool {
return msgCount.Load() >= 2
}, 5*time.Second, 10*time.Millisecond)
} else {
assert.Never(t, func() bool {
return msgCount.Load() >= 2
}, 50*time.Millisecond, 10*time.Millisecond)
}

// Stop the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}

func TestDisconnectWSByServer(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
Expand Down

0 comments on commit 36a1afd

Please sign in to comment.