Skip to content

Commit

Permalink
Refactor OpAMPClient callbacks (#87)
Browse files Browse the repository at this point in the history
Resolves #77

We previously delivered the data received in one ServerToAgent message using
multiple callbacks. This encouraged piecemeal processing and resulted in
multiple state changes when only one would be sufficient.

The new OnMessage callback delivers all data that can affect the state at once
which makes it possible process it in one pass. In the Supervisor this allows
to restart the Agent only once when applying changes that result in config
file change.
  • Loading branch information
tigrannajaryan authored Jun 7, 2022
1 parent 3f2eab4 commit 301ef45
Show file tree
Hide file tree
Showing 13 changed files with 384 additions and 586 deletions.
14 changes: 14 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type OpAMPClient interface {
// May be also called after Start(), in which case the attributes will be included
// in the next outgoing status report. This is typically used by Agents which allow
// their AgentDescription to change dynamically while the OpAMPClient is started.
// May be also called from OnMessage handler.
//
// The Hash field will be calculated and updated from the content of the rest of
// the fields.
Expand All @@ -59,5 +60,18 @@ type OpAMPClient interface {

// UpdateEffectiveConfig fetches the current local effective config using
// GetEffectiveConfig callback and sends it to the Server.
// May be called anytime after Start(), including from OnMessage handler.
UpdateEffectiveConfig(ctx context.Context) error

// SetRemoteConfigStatus sets the current RemoteConfigStatus.
// LastRemoteConfigHash field must be non-nil.
// May be called anytime after Start(), including from OnMessage handler.
// nil values are not allowed and will return an error.
SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error

// SetPackageStatuses sets the current PackageStatuses.
// ServerProvidedAllPackagesHash must be non-nil.
// May be called anytime after Start(), including from OnMessage handler.
// nil values are not allowed and will return an error.
SetPackageStatuses(statuses *protobufs.PackageStatuses) error
}
129 changes: 53 additions & 76 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"errors"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -325,20 +324,11 @@ func TestFirstStatusReport(t *testing.T) {
OnConnectFunc: func() {
atomic.AddInt64(&connected, 1)
},
OnRemoteConfigFunc: func(
ctx context.Context,
config *protobufs.AgentRemoteConfig,
) (
effectiveConfig *protobufs.EffectiveConfig, configChanged bool,
err error,
) {
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
// Verify that the client received exactly the remote config that
// the Server sent.
assert.True(t, proto.Equal(remoteConfig, config))
assert.True(t, proto.Equal(remoteConfig, msg.RemoteConfig))
atomic.AddInt64(&remoteConfigReceived, 1)
return &protobufs.EffectiveConfig{
ConfigMap: remoteConfig.Config,
}, true, nil
},
},
}
Expand Down Expand Up @@ -381,12 +371,6 @@ func TestIncludesDetailsOnReconnect(t *testing.T) {
OnConnectFunc: func() {
atomic.AddInt64(&connected, 1)
},
OnRemoteConfigFunc: func(
ctx context.Context,
config *protobufs.AgentRemoteConfig,
) (effectiveConfig *protobufs.EffectiveConfig, configChanged bool, err error) {
return &protobufs.EffectiveConfig{}, false, nil
},
},
}

Expand Down Expand Up @@ -565,14 +549,6 @@ func TestAgentIdentification(t *testing.T) {
// Start a client.
settings := types.StartSettings{}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
settings.Callbacks = types.CallbacksStruct{
OnAgentIdentificationFunc: func(
ctx context.Context,
agentId *protobufs.AgentIdentification,
) error {
return nil
},
}
prepareClient(t, &settings, client)

oldInstanceUid := settings.InstanceUid
Expand Down Expand Up @@ -654,39 +630,24 @@ func TestConnectionSettings(t *testing.T) {
// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
assert.True(t, proto.Equal(metricsSettings, msg.OwnMetricsConnSettings))
assert.True(t, proto.Equal(tracesSettings, msg.OwnTracesConnSettings))
assert.True(t, proto.Equal(logsSettings, msg.OwnLogsConnSettings))
atomic.AddInt64(&gotOwnSettings, 1)

assert.Len(t, msg.OtherConnSettings, 1)
assert.True(t, proto.Equal(otherSettings, msg.OtherConnSettings["other"]))
atomic.AddInt64(&gotOtherSettings, 1)
},

OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.True(t, proto.Equal(opampSettings, settings))
atomic.AddInt64(&gotOpampSettings, 1)
return nil
},

OnOwnTelemetryConnectionSettingsFunc: func(
ctx context.Context, telemetryType types.OwnTelemetryType,
settings *protobufs.TelemetryConnectionSettings,
) error {
switch telemetryType {
case types.OwnMetrics:
assert.True(t, proto.Equal(metricsSettings, settings))
case types.OwnTraces:
assert.True(t, proto.Equal(tracesSettings, settings))
case types.OwnLogs:
assert.True(t, proto.Equal(logsSettings, settings))
}
atomic.AddInt64(&gotOwnSettings, 1)
return nil
},

OnOtherConnectionSettingsFunc: func(
ctx context.Context, name string,
settings *protobufs.OtherConnectionSettings,
) error {
assert.EqualValues(t, "other", name)
assert.True(t, proto.Equal(otherSettings, settings))
atomic.AddInt64(&gotOtherSettings, 1)
return nil
},
},
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
Expand All @@ -695,7 +656,7 @@ func TestConnectionSettings(t *testing.T) {
assert.NoError(t, client.Start(context.Background(), settings))

eventually(t, func() bool { return atomic.LoadInt64(&gotOpampSettings) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&gotOwnSettings) == 3 })
eventually(t, func() bool { return atomic.LoadInt64(&gotOwnSettings) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&gotOtherSettings) == 1 })
eventually(t, func() bool { return atomic.LoadInt64(&rcvStatus) == 1 })

Expand Down Expand Up @@ -848,22 +809,28 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

// Prepare a callback that returns either success or failure.
onRemoteConfigFunc := func(
ctx context.Context, remoteConfig *protobufs.AgentRemoteConfig,
) (effectiveConfig *protobufs.EffectiveConfig, configChanged bool, err error) {
if successCase {
return createEffectiveConfig(), true, nil
} else {
return nil, false, errors.New("cannot update remote config")
}
}

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.CallbacksStruct{
OnRemoteConfigFunc: onRemoteConfigFunc,
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
if msg.RemoteConfig != nil {
if successCase {
client.SetRemoteConfigStatus(
&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: msg.RemoteConfig.ConfigHash,
Status: protobufs.RemoteConfigStatus_APPLIED,
})
} else {
client.SetRemoteConfigStatus(
&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: msg.RemoteConfig.ConfigHash,
Status: protobufs.RemoteConfigStatus_FAILED,
ErrorMessage: "cannot update remote config",
})
}
}
},
},
}
prepareClient(t, &settings, client)
Expand Down Expand Up @@ -909,6 +876,7 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot
// This time all fields except Hash must be unset. This is expected
// as compression in OpAMP.
status := msg.RemoteConfigStatus
require.NotNil(t, status)
assert.EqualValues(t, firstConfigStatus.Hash, status.Hash)
assert.EqualValues(t, protobufs.RemoteConfigStatus_UNSET, status.Status)
assert.EqualValues(t, "", status.ErrorMessage)
Expand Down Expand Up @@ -977,6 +945,7 @@ type packageTestCase struct {
available *protobufs.PackagesAvailable
expectedStatus *protobufs.PackageStatuses
expectedFileContent map[string][]byte
expectedError string
}

const packageUpdateErrorMsg = "cannot update packages"
Expand All @@ -993,22 +962,26 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
var syncerDoneCh <-chan struct{}

// Prepare a callback that returns either success or failure.
onPackagesAvailable := func(ctx context.Context, packages *protobufs.PackagesAvailable, syncer types.PackagesSyncer) error {
if testCase.errorOnCallback {
return errors.New(packageUpdateErrorMsg)
} else {
syncerDoneCh = syncer.Done()
err := syncer.Sync(ctx)
require.NoError(t, err)
return nil
onMessageFunc := func(ctx context.Context, msg *types.MessageData) {
if msg.PackageSyncer != nil {
if testCase.errorOnCallback {
client.SetPackageStatuses(&protobufs.PackageStatuses{
ServerProvidedAllPackagesHash: msg.PackagesAvailable.AllPackagesHash,
ErrorMessage: packageUpdateErrorMsg,
})
} else {
syncerDoneCh = msg.PackageSyncer.Done()
err := msg.PackageSyncer.Sync(ctx)
require.NoError(t, err)
}
}
}

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.CallbacksStruct{
OnPackagesAvailableFunc: onPackagesAvailable,
OnMessageFunc: onMessageFunc,
},
PackagesStateProvider: localPackageState,
}
Expand Down Expand Up @@ -1042,6 +1015,11 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
assert.EqualValues(t, testCase.expectedStatus.ServerProvidedAllPackagesHash, status.ServerProvidedAllPackagesHash)
lastStatusHash = status.Hash

if testCase.expectedError != "" {
assert.EqualValues(t, testCase.expectedError, status.ErrorMessage)
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}, true
}

// Verify individual package statuses.
for name, pkgExpected := range testCase.expectedStatus.Packages {
pkgStatus := status.Packages[name]
Expand Down Expand Up @@ -1209,8 +1187,7 @@ func TestUpdatePackages(t *testing.T) {

// A case when OnPackagesAvailable callback returns an error.
errorOnCallback := createPackageTestCase("error on callback", downloadSrv)
errorOnCallback.expectedStatus.Packages["package1"].Status = protobufs.PackageStatus_InstallFailed
errorOnCallback.expectedStatus.Packages["package1"].ErrorMessage = packageUpdateErrorMsg
errorOnCallback.expectedError = packageUpdateErrorMsg
errorOnCallback.errorOnCallback = true
tests = append(tests, errorOnCallback)

Expand Down
8 changes: 8 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ func (c *httpClient) UpdateEffectiveConfig(ctx context.Context) error {
return c.common.UpdateEffectiveConfig(ctx)
}

func (c *httpClient) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error {
return c.common.SetRemoteConfigStatus(status)
}

func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error {
return c.common.SetPackageStatuses(statuses)
}

func (c *httpClient) runUntilStopped(ctx context.Context) {
// Start the HTTP sender. This will make request/responses with retries for
// failures and will wait with configured polling interval if there is nothing
Expand Down
85 changes: 77 additions & 8 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internal

import (
"bytes"
"context"
"crypto/sha256"
"errors"
Expand Down Expand Up @@ -51,10 +52,14 @@ type ClientCommon struct {
}

func NewClientCommon(logger types.Logger, sender Sender) ClientCommon {
return ClientCommon{Logger: logger, sender: sender, stoppedSignal: make(chan struct{}, 1)}
return ClientCommon{
Logger: logger, sender: sender, stoppedSignal: make(chan struct{}, 1),
}
}

func (c *ClientCommon) PrepareStart(_ context.Context, settings types.StartSettings) error {
func (c *ClientCommon) PrepareStart(
_ context.Context, settings types.StartSettings,
) error {
if c.isStarted {
return errAlreadyStarted
}
Expand Down Expand Up @@ -205,9 +210,11 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er
if err := c.ClientSyncedState.SetAgentDescription(descr); err != nil {
return err
}
c.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
})
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
},
)
c.sender.ScheduleSend()
return nil
}
Expand Down Expand Up @@ -254,13 +261,75 @@ func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error {
calcHashEffectiveConfig(cfg)
}
// Send it to the Server.
c.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.EffectiveConfig = cfg
})
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.EffectiveConfig = cfg
},
)
// TODO: if this call is coming from OnMessage callback don't schedule the send
// immediately, wait until the end of OnMessage to send one message only.
c.sender.ScheduleSend()

// Note that we do not store the EffectiveConfig anywhere else. It will be deleted
// from NextMessage when the message is sent. This avoids storing EffectiveConfig
// in memory for longer than it is needed.
return nil
}

func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error {
if status.LastRemoteConfigHash == nil {
return errLastRemoteConfigHashNil
}

// Get the hash of the status before we update it.
prevHash := c.ClientSyncedState.RemoteConfigStatus().GetHash()

// Remember the new status.
if err := c.ClientSyncedState.SetRemoteConfigStatus(status); err != nil {
return err
}

// Check if the new status is different from the previous by comparing the hashes.
if !bytes.Equal(prevHash, status.Hash) {
// Let the Server know about the new status.
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.RemoteConfigStatus = c.ClientSyncedState.RemoteConfigStatus()
},
)
// TODO: if this call is coming from OnMessage callback don't schedule the send
// immediately, wait until the end of OnMessage to send one message only.
c.sender.ScheduleSend()
}

return nil
}

func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error {
if statuses.ServerProvidedAllPackagesHash == nil {
return errServerProvidedAllPackagesHashNil
}

// Get the hash of the status before we update it.
prevHash := c.ClientSyncedState.PackageStatuses().GetHash()

if err := c.ClientSyncedState.SetPackageStatuses(statuses); err != nil {
return err
}

// Check if the new status is different from the previous by comparing the hashes.
if !bytes.Equal(prevHash, statuses.Hash) {
// Let the Server know about the new status.

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.PackageStatuses = c.ClientSyncedState.PackageStatuses()
},
)
// TODO: if this call is coming from OnMessage callback don't schedule the send
// immediately, wait until the end of OnMessage to send one message only.
c.sender.ScheduleSend()
}

return nil
}
Loading

0 comments on commit 301ef45

Please sign in to comment.