Skip to content

Commit

Permalink
Simplify status compression (#93)
Browse files Browse the repository at this point in the history
- Eliminate all "hash" fields.
- Allow omitting the sub-messages of AgentToServer message. When omitted it is implied that previously reported value of the sub-message is current (unchanged).
- To detect lost messages have one auto-incremented sequence_num field AgentToServer message. Server can easily detect losses by just keeping the last sequence_num (as opposed to keeping 4 different hashes).

Implements spec change open-telemetry/opamp-spec#101
  • Loading branch information
tigrannajaryan authored Jun 29, 2022
1 parent 301ef45 commit aad0403
Show file tree
Hide file tree
Showing 15 changed files with 518 additions and 864 deletions.
3 changes: 0 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ type OpAMPClient interface {
// their AgentDescription to change dynamically while the OpAMPClient is started.
// May be also called from OnMessage handler.
//
// The Hash field will be calculated and updated from the content of the rest of
// the fields.
//
// nil values are not allowed and will return an error.
SetAgentDescription(descr *protobufs.AgentDescription) error

Expand Down
65 changes: 27 additions & 38 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func TestFirstStatusReport(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
RemoteConfig: remoteConfig,
Expand Down Expand Up @@ -353,8 +354,13 @@ func TestFirstStatusReport(t *testing.T) {
func TestIncludesDetailsOnReconnect(t *testing.T) {
srv := internal.StartMockServer(t)

seqNum := 0

var receivedDetails int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, seqNum, msg.SequenceNum)
seqNum++

// Track when we receive AgentDescription
if msg.AgentDescription != nil {
atomic.AddInt64(&receivedDetails, 1)
Expand Down Expand Up @@ -687,6 +693,7 @@ func TestReportAgentDescription(t *testing.T) {

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// The first status report after Start must have full AgentDescription.
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
Expand All @@ -699,25 +706,22 @@ func TestReportAgentDescription(t *testing.T) {
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must have compressed AgentDescription.
descr := msg.AgentDescription
assert.Nil(t, descr.IdentifyingAttributes)
assert.Nil(t, descr.NonIdentifyingAttributes)
assert.Nil(t, msg.AgentDescription)

// The Hash field must be present and unchanged.
assert.NotNil(t, descr.Hash)
assert.EqualValues(t, client.AgentDescription().Hash, descr.Hash)
assert.EqualValues(t, 1, msg.SequenceNum)

// Ask client for full AgentDescription.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Flags: protobufs.ServerToAgent_ReportAgentDescription,
Flags: protobufs.ServerToAgent_ReportFullState,
}
})

// Server has requested the client to report, so there will be another message
// coming to the Server.
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 2, msg.SequenceNum)
// The status report must again have full AgentDescription
// because the Server asked for it.
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
Expand Down Expand Up @@ -758,6 +762,7 @@ func TestReportEffectiveConfig(t *testing.T) {

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// The first status report after Start must have full EffectiveConfig.
assert.True(t, proto.Equal(clientEffectiveConfig, msg.EffectiveConfig))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
Expand All @@ -770,23 +775,21 @@ func TestReportEffectiveConfig(t *testing.T) {
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must have compressed EffectiveConfig.
cfg := msg.EffectiveConfig
assert.Nil(t, cfg.ConfigMap)
assert.Nil(t, msg.EffectiveConfig)

// Hash must be present and unchanged.
assert.NotNil(t, cfg.Hash)
assert.EqualValues(t, clientEffectiveConfig.Hash, cfg.Hash)
assert.EqualValues(t, 1, msg.SequenceNum)

// Ask client for full AgentDescription.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Flags: protobufs.ServerToAgent_ReportEffectiveConfig,
Flags: protobufs.ServerToAgent_ReportFullState,
}
})

// Server has requested the client to report, so there will be another message.
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 2, msg.SequenceNum)
// The status report must again have full EffectiveConfig
// because Server asked for it.
assert.True(t, proto.Equal(clientEffectiveConfig, msg.EffectiveConfig))
Expand Down Expand Up @@ -841,6 +844,7 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot
remoteCfg := createRemoteConfig()
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// Send the remote config to the Agent.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Expand All @@ -855,12 +859,12 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 1, msg.SequenceNum)
// Verify that the remote config status is as expected.
status := msg.RemoteConfigStatus
assert.EqualValues(t, expectStatus.Status, status.Status)
assert.Equal(t, expectStatus.ErrorMessage, status.ErrorMessage)
assert.EqualValues(t, remoteCfg.ConfigHash, status.LastRemoteConfigHash)
assert.NotNil(t, status.Hash)

firstConfigStatus = proto.Clone(status).(*protobufs.RemoteConfigStatus)

Expand All @@ -873,24 +877,21 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// This time all fields except Hash must be unset. This is expected
// This time the RemoteConfigStatus field must be unset. This is expected
// as compression in OpAMP.
status := msg.RemoteConfigStatus
require.NotNil(t, status)
assert.EqualValues(t, firstConfigStatus.Hash, status.Hash)
assert.EqualValues(t, protobufs.RemoteConfigStatus_UNSET, status.Status)
assert.EqualValues(t, "", status.ErrorMessage)
assert.Nil(t, status.LastRemoteConfigHash)
require.Nil(t, msg.RemoteConfigStatus)
assert.EqualValues(t, 2, msg.SequenceNum)

return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
// Ask client to report full status.
Flags: protobufs.ServerToAgent_ReportRemoteConfigStatus,
Flags: protobufs.ServerToAgent_ReportFullState,
}
})

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 3, msg.SequenceNum)
// Exact same full status must be present again.
status := msg.RemoteConfigStatus
assert.True(t, proto.Equal(status, firstConfigStatus))
Expand Down Expand Up @@ -992,6 +993,7 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// Send the packages to the Agent.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Expand All @@ -1002,8 +1004,6 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
// The Agent will try to install the packages and will send the status
// report about it back to the Server.

var lastStatusHash []byte

// ---> Server
// Wait for the expected package statuses to be received.
srv.EventuallyExpect("full PackageStatuses",
Expand All @@ -1013,7 +1013,6 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
status := msg.PackageStatuses
require.NotNil(t, status)
assert.EqualValues(t, testCase.expectedStatus.ServerProvidedAllPackagesHash, status.ServerProvidedAllPackagesHash)
lastStatusHash = status.Hash

if testCase.expectedError != "" {
assert.EqualValues(t, testCase.expectedError, status.ErrorMessage)
Expand Down Expand Up @@ -1046,7 +1045,6 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
assert.Len(t, status.Packages, len(testCase.available.Packages))
}
}
assert.NotNil(t, status.Hash)

return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}, expectedStatusReceived
})
Expand All @@ -1069,21 +1067,13 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
srv.EventuallyExpect("compressed PackageStatuses",
func(msg *protobufs.AgentToServer) (*protobufs.ServerToAgent, bool) {
// Ensure that compressed status is received.
status := msg.PackageStatuses
require.NotNil(t, status)
compressedReceived := status.ServerProvidedAllPackagesHash == nil
if compressedReceived {
assert.Nil(t, status.ServerProvidedAllPackagesHash)
assert.Nil(t, status.Packages)
}
assert.NotNil(t, status.Hash)
assert.Equal(t, lastStatusHash, status.Hash)
compressedReceived := msg.PackageStatuses == nil

response := &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}

if compressedReceived {
// Ask for full report again.
response.Flags = protobufs.ServerToAgent_ReportPackageStatuses
response.Flags = protobufs.ServerToAgent_ReportFullState
} else {
// Keep triggering status report by setting AgentDescription
// until the compressed PackageStatuses arrives.
Expand Down Expand Up @@ -1114,8 +1104,7 @@ func createDownloadSrv(t *testing.T) *httptest.Server {
w.WriteHeader(http.StatusOK)
_, err := w.Write(packageFileContent)
assert.NoError(t, err)
},
)
})

srv := httptest.NewServer(m)

Expand Down
1 change: 1 addition & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestHTTPPolling(t *testing.T) {
srv := internal.StartMockServer(t)
var rcvCounter int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, rcvCounter, msg.SequenceNum)
if msg != nil {
atomic.AddInt64(&rcvCounter, 1)
}
Expand Down
53 changes: 6 additions & 47 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package internal

import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"sort"
"sync"

"github.com/open-telemetry/opamp-go/client/types"
Expand Down Expand Up @@ -175,9 +172,6 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
if err != nil {
return err
}
if cfg != nil {
calcHashEffectiveConfig(cfg)
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
Expand Down Expand Up @@ -219,36 +213,6 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er
return nil
}

// calcHashEffectiveConfig calculates and sets the Hash field from the rest of the
// fields in the message.
func calcHashEffectiveConfig(msg *protobufs.EffectiveConfig) {
cfgMap := msg.GetConfigMap().GetConfigMap()

// Construct hash
h := sha256.New()

// If the config is empty don't attemp to add more to the hash
if len(cfgMap) > 0 {
// Sort keys of configMap to make deterministic hash
keys := make([]string, 0, len(cfgMap))
for k := range cfgMap {
keys = append(keys, k)
}

sort.Strings(keys)

if msg.ConfigMap != nil {
for _, k := range keys {
v := cfgMap[k]
h.Write([]byte(k))
h.Write(v.Body)
h.Write([]byte(v.ContentType))
}
}
}
msg.Hash = h.Sum(nil)
}

// UpdateEffectiveConfig fetches the current local effective config using
// GetEffectiveConfig callback and sends it to the Server using provided Sender.
func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error {
Expand All @@ -257,9 +221,7 @@ func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error {
if err != nil {
return fmt.Errorf("GetEffectiveConfig failed: %w", err)
}
if cfg != nil {
calcHashEffectiveConfig(cfg)
}

// Send it to the Server.
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
Expand All @@ -281,16 +243,14 @@ func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatu
return errLastRemoteConfigHashNil
}

// Get the hash of the status before we update it.
prevHash := c.ClientSyncedState.RemoteConfigStatus().GetHash()
statusChanged := !proto.Equal(c.ClientSyncedState.RemoteConfigStatus(), status)

// Remember the new status.
if err := c.ClientSyncedState.SetRemoteConfigStatus(status); err != nil {
return err
}

// Check if the new status is different from the previous by comparing the hashes.
if !bytes.Equal(prevHash, status.Hash) {
if statusChanged {
// Let the Server know about the new status.
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
Expand All @@ -310,15 +270,14 @@ func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) e
return errServerProvidedAllPackagesHashNil
}

// Get the hash of the status before we update it.
prevHash := c.ClientSyncedState.PackageStatuses().GetHash()
statusChanged := !proto.Equal(c.ClientSyncedState.PackageStatuses(), statuses)

if err := c.ClientSyncedState.SetPackageStatuses(statuses); err != nil {
return err
}

// Check if the new status is different from the previous by comparing the hashes.
if !bytes.Equal(prevHash, statuses.Hash) {
// Check if the new status is different from the previous.
if statusChanged {
// Let the Server know about the new status.

c.sender.NextMessage().Update(
Expand Down
57 changes: 0 additions & 57 deletions client/internal/clientcommon_test.go

This file was deleted.

Loading

0 comments on commit aad0403

Please sign in to comment.