Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting initial heartbeat interval #305

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,81 @@ func TestHTTPClientSetPollingInterval(t *testing.T) {
err := client.Stop(context.Background())
assert.NoError(t, err)
}

func TestHTTPClientStartWithHeartbeatInterval(t *testing.T) {
tests := []struct {
name string
enableHeartbeat bool
expectHeartbeats bool
}{
{"client enable heartbeat", true, true},
{"client disable heartbeat", false, false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
var rcvCounter int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg == nil {
t.Error("unexpected nil msg")
return nil
}
assert.EqualValues(t, rcvCounter, msg.SequenceNum)
atomic.AddInt64(&rcvCounter, 1)
return nil
}

// Start a client.
heartbeat := 10 * time.Millisecond
settings := types.StartSettings{
OpAMPServerURL: "http://" + srv.Endpoint,
HeartbeatInterval: &heartbeat,
}
if tt.enableHeartbeat {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat
}
client := NewHTTP(nil)
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// Verify that status report is delivered.
eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) == 1 })

if tt.expectHeartbeats {
assert.Eventually(t, func() bool { return atomic.LoadInt64(&rcvCounter) >= 2 }, 5*time.Second, 10*time.Millisecond)
} else {
assert.Never(t, func() bool { return atomic.LoadInt64(&rcvCounter) >= 2 }, 50*time.Millisecond, 10*time.Millisecond)
}

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}

func TestHTTPClientStartWithZeroHeartbeatInterval(t *testing.T) {
srv := internal.StartMockServer(t)

// Start a client.
heartbeat := 0 * time.Millisecond
settings := types.StartSettings{
OpAMPServerURL: "http://" + srv.Endpoint,
HeartbeatInterval: &heartbeat,
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat,
}
client := NewHTTP(nil)
prepareClient(t, &settings, client)

// Zero heartbeat interval is invalid for http client.
assert.Error(t, client.Start(context.Background(), settings))

// Shutdown the Server.
srv.Close()
}
6 changes: 6 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (c *ClientCommon) PrepareStart(
c.Callbacks = types.CallbacksStruct{}
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat != 0 && settings.HeartbeatInterval != nil {
if err := c.sender.SetHeartbeatInterval(*settings.HeartbeatInterval); err != nil {
return err
}
}

if err := c.sender.SetInstanceUid(settings.InstanceUid); err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions client/internal/wssender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

const (
defaultSendCloseMessageTimeout = 5 * time.Second
defaultHeartbeatIntervalSeconds = 30
defaultSendCloseMessageTimeout = 5 * time.Second
defaultHeartbeatIntervalMs = 30 * 1000
)

// WSSender implements the WebSocket client's sending portion of OpAMP protocol.
Expand All @@ -30,7 +30,7 @@ type WSSender struct {
err error

heartbeatIntervalUpdated chan struct{}
heartbeatIntervalSeconds atomic.Int64
heartbeatIntervalMs atomic.Int64
heartbeatTimer *time.Timer
}

Expand All @@ -43,7 +43,7 @@ func NewSender(logger types.Logger) *WSSender {
heartbeatTimer: time.NewTimer(0),
SenderCommon: NewSenderCommon(),
}
s.heartbeatIntervalSeconds.Store(defaultHeartbeatIntervalSeconds)
s.heartbeatIntervalMs.Store(defaultHeartbeatIntervalMs)

return s
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func (s *WSSender) SetHeartbeatInterval(d time.Duration) error {
return errors.New("heartbeat interval for wsclient must be non-negative")
}

s.heartbeatIntervalSeconds.Store(int64(d.Seconds()))
s.heartbeatIntervalMs.Store(int64(d.Milliseconds()))
select {
case s.heartbeatIntervalUpdated <- struct{}{}:
default:
Expand All @@ -101,7 +101,7 @@ func (s *WSSender) shouldSendHeartbeat() <-chan time.Time {
}
}

if d := time.Duration(s.heartbeatIntervalSeconds.Load()) * time.Second; d != 0 {
if d := time.Duration(s.heartbeatIntervalMs.Load()) * time.Millisecond; d != 0 {
t.Reset(d)
return t.C
}
Expand Down
12 changes: 6 additions & 6 deletions client/internal/wssender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ func TestWSSenderSetHeartbeatInterval(t *testing.T) {
sender := NewSender(&sharedinternal.NopLogger{})

// Default interval should be 30s as per OpAMP Specification
assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load())
assert.Equal(t, int64((30 * time.Second).Milliseconds()), sender.heartbeatIntervalMs.Load())

// negative interval is invalid for http sender
assert.Error(t, sender.SetHeartbeatInterval(-1))
assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load())
assert.Equal(t, int64((30 * time.Second).Milliseconds()), sender.heartbeatIntervalMs.Load())

// zero is valid for ws sender
assert.NoError(t, sender.SetHeartbeatInterval(0))
assert.Equal(t, int64(0), sender.heartbeatIntervalSeconds.Load())
assert.Equal(t, int64(0), sender.heartbeatIntervalMs.Load())

var expected int64 = 10
assert.NoError(t, sender.SetHeartbeatInterval(time.Duration(expected)*time.Second))
assert.Equal(t, expected, sender.heartbeatIntervalSeconds.Load())
var expected int64 = 10000
assert.NoError(t, sender.SetHeartbeatInterval(time.Duration(expected)*time.Millisecond))
assert.Equal(t, expected, sender.heartbeatIntervalMs.Load())
}
11 changes: 11 additions & 0 deletions client/types/startsettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"crypto/tls"
"net/http"
"time"

"github.com/open-telemetry/opamp-go/protobufs"
)
Expand Down Expand Up @@ -53,4 +54,14 @@ type StartSettings struct {
// the compression is only effectively enabled if the Server also supports compression.
// The data will be compressed in both directions.
EnableCompression bool

// Optional HeartbeatInterval to configure the heartbeat interval for client.
// If nil, the default heartbeat interval (30s) will be used.
// If zero, heartbeat will be disabled for a Websocket-based client.
//
// Note that an HTTP-based client will use the heartbeat interval as its polling interval
// and zero is invalid for an HTTP-based client.
//
// If the ReportsHeartbeat capability is disabled, this option has no effect.
HeartbeatInterval *time.Duration
}
56 changes: 56 additions & 0 deletions client/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,62 @@ func TestWSSenderReportsHeartbeat(t *testing.T) {
}
}

func TestWSClientStartWithHeartbeatInterval(t *testing.T) {
tests := []struct {
name string
clientEnableHeartbeat bool
expectHeartbeats bool
}{
{"client enable heartbeat", true, true},
{"client disable heartbeat", false, false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := internal.StartMockServer(t)

var conn atomic.Value
srv.OnWSConnect = func(c *websocket.Conn) {
conn.Store(c)
}
var msgCount atomic.Int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
msgCount.Add(1)
return nil
}

// Start an OpAMP/WebSocket client.
heartbeat := 10 * time.Millisecond
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
HeartbeatInterval: &heartbeat,
}
if tt.clientEnableHeartbeat {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat
}
client := NewWebSocket(nil)
startClient(t, settings, client)

// Wait for connection to be established.
eventually(t, func() bool { return conn.Load() != nil })

if tt.expectHeartbeats {
assert.Eventually(t, func() bool {
return msgCount.Load() >= 2
}, 5*time.Second, 10*time.Millisecond)
} else {
assert.Never(t, func() bool {
haoqixu marked this conversation as resolved.
Show resolved Hide resolved
return msgCount.Load() >= 2
}, 50*time.Millisecond, 10*time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has a chance of failing to catch an unexpected heartbeat if the heartbeat is delayed, but I think that's fine.

}

// Stop the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}

func TestDisconnectWSByServer(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
Expand Down
Loading