Skip to content

Commit

Permalink
Move messages in StatusReport to AgentToServer message (#83)
Browse files Browse the repository at this point in the history
StatusReport was an unnecessary message, remaining from the time
when we had DataToAgent which had a oneof field, and it was
impossible to set multiple fields that together form the status.
All its fields are moved to AgentToServer message now and can be
set directly there (there is no longer a oneof limitation so now
this is possible to do).

Implements spec change: open-telemetry/opamp-spec#85
  • Loading branch information
tigrannajaryan authored May 25, 2022
1 parent b655f04 commit afb132b
Show file tree
Hide file tree
Showing 12 changed files with 619 additions and 725 deletions.
34 changes: 15 additions & 19 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func TestIncludesDetailsOnReconnect(t *testing.T) {
var receivedDetails int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// Track when we receive AgentDescription
if msg.StatusReport.AgentDescription != nil {
if msg.AgentDescription != nil {
atomic.AddInt64(&receivedDetails, 1)
}

Expand Down Expand Up @@ -426,10 +426,8 @@ func TestSetEffectiveConfig(t *testing.T) {
srv := internal.StartMockServer(t)
var rcvConfig atomic.Value
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if statusReport := msg.GetStatusReport(); statusReport != nil {
if statusReport.EffectiveConfig != nil {
rcvConfig.Store(statusReport.EffectiveConfig)
}
if msg.EffectiveConfig != nil {
rcvConfig.Store(msg.EffectiveConfig)
}
return nil
}
Expand Down Expand Up @@ -487,10 +485,8 @@ func TestSetAgentDescription(t *testing.T) {
srv := internal.StartMockServer(t)
var rcvAgentDescr atomic.Value
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if statusReport := msg.GetStatusReport(); statusReport != nil {
if statusReport.AgentDescription != nil {
rcvAgentDescr.Store(statusReport.AgentDescription)
}
if msg.AgentDescription != nil {
rcvAgentDescr.Store(msg.AgentDescription)
}
return nil
}
Expand Down Expand Up @@ -632,7 +628,7 @@ func TestConnectionSettings(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if statusReport := msg.GetStatusReport(); statusReport != nil {
if msg != nil {
atomic.AddInt64(&rcvStatus, 1)

return &protobufs.ServerToAgent{
Expand Down Expand Up @@ -731,7 +727,7 @@ func TestReportAgentDescription(t *testing.T) {
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The first status report after Start must have full AgentDescription.
assert.True(t, proto.Equal(client.AgentDescription(), msg.StatusReport.AgentDescription))
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
})

Expand All @@ -742,7 +738,7 @@ func TestReportAgentDescription(t *testing.T) {
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must have compressed AgentDescription.
descr := msg.StatusReport.AgentDescription
descr := msg.AgentDescription
assert.Nil(t, descr.IdentifyingAttributes)
assert.Nil(t, descr.NonIdentifyingAttributes)

Expand All @@ -763,7 +759,7 @@ func TestReportAgentDescription(t *testing.T) {
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must again have full AgentDescription
// because the Server asked for it.
assert.True(t, proto.Equal(client.AgentDescription(), msg.StatusReport.AgentDescription))
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
})

Expand Down Expand Up @@ -802,7 +798,7 @@ func TestReportEffectiveConfig(t *testing.T) {
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The first status report after Start must have full EffectiveConfig.
assert.True(t, proto.Equal(clientEffectiveConfig, msg.StatusReport.EffectiveConfig))
assert.True(t, proto.Equal(clientEffectiveConfig, msg.EffectiveConfig))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
})

Expand All @@ -813,7 +809,7 @@ func TestReportEffectiveConfig(t *testing.T) {
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must have compressed EffectiveConfig.
cfg := msg.StatusReport.EffectiveConfig
cfg := msg.EffectiveConfig
assert.Nil(t, cfg.ConfigMap)

// Hash must be present and unchanged.
Expand All @@ -832,7 +828,7 @@ func TestReportEffectiveConfig(t *testing.T) {
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must again have full EffectiveConfig
// because Server asked for it.
assert.True(t, proto.Equal(clientEffectiveConfig, msg.StatusReport.EffectiveConfig))
assert.True(t, proto.Equal(clientEffectiveConfig, msg.EffectiveConfig))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
})

Expand Down Expand Up @@ -893,7 +889,7 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// Verify that the remote config status is as expected.
status := msg.StatusReport.RemoteConfigStatus
status := msg.RemoteConfigStatus
assert.EqualValues(t, expectStatus.Status, status.Status)
assert.Equal(t, expectStatus.ErrorMessage, status.ErrorMessage)
assert.EqualValues(t, remoteCfg.ConfigHash, status.LastRemoteConfigHash)
Expand All @@ -912,7 +908,7 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// This time all fields except Hash must be unset. This is expected
// as compression in OpAMP.
status := msg.StatusReport.RemoteConfigStatus
status := msg.RemoteConfigStatus
assert.EqualValues(t, firstConfigStatus.Hash, status.Hash)
assert.EqualValues(t, protobufs.RemoteConfigStatus_UNSET, status.Status)
assert.EqualValues(t, "", status.ErrorMessage)
Expand All @@ -928,7 +924,7 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// Exact same full status must be present again.
status := msg.StatusReport.RemoteConfigStatus
status := msg.RemoteConfigStatus
assert.True(t, proto.Equal(status, firstConfigStatus))

return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
Expand Down
2 changes: 1 addition & 1 deletion client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestHTTPPolling(t *testing.T) {
srv := internal.StartMockServer(t)
var rcvCounter int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if statusReport := msg.GetStatusReport(); statusReport != nil {
if msg != nil {
atomic.AddInt64(&rcvCounter, 1)
}
return nil
Expand Down
21 changes: 9 additions & 12 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,15 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
if msg.StatusReport == nil {
msg.StatusReport = &protobufs.StatusReport{}
}
msg.StatusReport.AgentDescription = c.ClientSyncedState.AgentDescription()
msg.StatusReport.EffectiveConfig = cfg
msg.StatusReport.RemoteConfigStatus = c.ClientSyncedState.RemoteConfigStatus()
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
msg.EffectiveConfig = cfg
msg.RemoteConfigStatus = c.ClientSyncedState.RemoteConfigStatus()
msg.PackageStatuses = c.ClientSyncedState.PackageStatuses()

if c.PackagesStateProvider != nil {
// We have a state provider, so package related capabilities can work.
msg.StatusReport.Capabilities |= protobufs.AgentCapabilities_AcceptsPackages
msg.StatusReport.Capabilities |= protobufs.AgentCapabilities_ReportsPackageStatuses
msg.Capabilities |= protobufs.AgentCapabilities_AcceptsPackages
msg.Capabilities |= protobufs.AgentCapabilities_ReportsPackageStatuses
}
},
)
Expand All @@ -208,8 +205,8 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er
if err := c.ClientSyncedState.SetAgentDescription(descr); err != nil {
return err
}
c.sender.NextMessage().UpdateStatus(func(statusReport *protobufs.StatusReport) {
statusReport.AgentDescription = c.ClientSyncedState.AgentDescription()
c.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
})
c.sender.ScheduleSend()
return nil
Expand Down Expand Up @@ -257,8 +254,8 @@ func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error {
calcHashEffectiveConfig(cfg)
}
// Send it to the Server.
c.sender.NextMessage().UpdateStatus(func(statusReport *protobufs.StatusReport) {
statusReport.EffectiveConfig = cfg
c.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.EffectiveConfig = cfg
})
c.sender.ScheduleSend()

Expand Down
2 changes: 1 addition & 1 deletion client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (h *HTTPSender) Run(

case <-pollingTimer.C:
// Polling interval has passed. Force a status update.
h.NextMessage().UpdateStatus(func(statusReport *protobufs.StatusReport) {})
h.NextMessage().Update(func(msg *protobufs.AgentToServer) {})
// This will make hasPendingMessage channel readable, so we will enter
// the case above on the next iteration of the loop.
h.ScheduleSend()
Expand Down
31 changes: 8 additions & 23 deletions client/internal/nextmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,6 @@ func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) {
s.messageMutex.Unlock()
}

// UpdateStatus applies the specified modifier function to the status report that
// will be sent next and marks the status report as pending to be sent.
func (s *NextMessage) UpdateStatus(modifier func(statusReport *protobufs.StatusReport)) {
s.Update(
func(msg *protobufs.AgentToServer) {
if s.nextMessage.StatusReport == nil {
s.nextMessage.StatusReport = &protobufs.StatusReport{}
}
modifier(s.nextMessage.StatusReport)
},
)
}

// PopPending returns the next message to be sent, if it is pending or nil otherwise.
// Clears the "pending" flag.
func (s *NextMessage) PopPending() *protobufs.AgentToServer {
Expand All @@ -55,22 +42,20 @@ func (s *NextMessage) PopPending() *protobufs.AgentToServer {
// next report after this one. Keep the "hash" fields.
msg := protobufs.AgentToServer{
InstanceUid: s.nextMessage.InstanceUid,
StatusReport: &protobufs.StatusReport{
AgentDescription: &protobufs.AgentDescription{
Hash: s.nextMessage.StatusReport.AgentDescription.Hash,
},
AgentDescription: &protobufs.AgentDescription{
Hash: s.nextMessage.AgentDescription.Hash,
},
}

if s.nextMessage.StatusReport.EffectiveConfig != nil {
msg.StatusReport.EffectiveConfig = &protobufs.EffectiveConfig{
Hash: s.nextMessage.StatusReport.EffectiveConfig.Hash,
if s.nextMessage.EffectiveConfig != nil {
msg.EffectiveConfig = &protobufs.EffectiveConfig{
Hash: s.nextMessage.EffectiveConfig.Hash,
}
}

if s.nextMessage.StatusReport.RemoteConfigStatus != nil {
msg.StatusReport.RemoteConfigStatus = &protobufs.RemoteConfigStatus{
Hash: s.nextMessage.StatusReport.RemoteConfigStatus.Hash,
if s.nextMessage.RemoteConfigStatus != nil {
msg.RemoteConfigStatus = &protobufs.RemoteConfigStatus{
Hash: s.nextMessage.RemoteConfigStatus.Hash,
}
}

Expand Down
23 changes: 11 additions & 12 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ func (r *receivedProcessor) rcvFlags(
// send to the Server.

if flags&protobufs.ServerToAgent_ReportAgentDescription != 0 {
r.sender.NextMessage().UpdateStatus(func(statusReport *protobufs.StatusReport) {
statusReport.AgentDescription = r.clientSyncedState.AgentDescription()
r.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.AgentDescription = r.clientSyncedState.AgentDescription()
})
scheduleSend = true
}

if flags&protobufs.ServerToAgent_ReportRemoteConfigStatus != 0 {
r.sender.NextMessage().UpdateStatus(func(statusReport *protobufs.StatusReport) {
statusReport.RemoteConfigStatus = r.clientSyncedState.RemoteConfigStatus()
r.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.RemoteConfigStatus = r.clientSyncedState.RemoteConfigStatus()
})
scheduleSend = true
}
Expand All @@ -112,8 +112,8 @@ func (r *receivedProcessor) rcvFlags(
if err != nil {
return false, err
}
r.sender.NextMessage().UpdateStatus(func(statusReport *protobufs.StatusReport) {
statusReport.EffectiveConfig = cfg
r.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.EffectiveConfig = cfg
})
scheduleSend = true
}
Expand Down Expand Up @@ -163,19 +163,18 @@ func (r *receivedProcessor) rcvRemoteConfig(
r.callbacks.SaveRemoteConfigStatus(ctx, cfgStatus)

// Include the config status in the next message to the Server.
r.sender.NextMessage().UpdateStatus(
func(statusReport *protobufs.StatusReport) {
statusReport.RemoteConfigStatus = cfgStatus
})
r.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.RemoteConfigStatus = cfgStatus
})

reportStatus = true
}

// Report the effective configuration if it changed and remote config was applied
// successfully.
if changed && applyErr == nil {
r.sender.NextMessage().UpdateStatus(func(statusReport *protobufs.StatusReport) {
statusReport.EffectiveConfig = effective
r.sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.EffectiveConfig = effective
})
reportStatus = true
}
Expand Down
4 changes: 2 additions & 2 deletions client/internal/wsreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ func TestOnRemoteConfigReportsError(t *testing.T) {
RemoteConfig: &protobufs.AgentRemoteConfig{},
})

gotStatus := receiver.sender.NextMessage().nextMessage.StatusReport.RemoteConfigStatus.Status
gotMsg := receiver.sender.NextMessage().nextMessage.StatusReport.RemoteConfigStatus.ErrorMessage
gotStatus := receiver.sender.NextMessage().nextMessage.RemoteConfigStatus.Status
gotMsg := receiver.sender.NextMessage().nextMessage.RemoteConfigStatus.ErrorMessage

assert.Equal(t, expectStatus, gotStatus)
assert.Equal(t, expectMsg, gotMsg)
Expand Down
8 changes: 4 additions & 4 deletions client/types/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Callbacks interface {
// the effective config has changed.
//
// The returned effective config or the error will be reported back to the Server
// via StatusReport message (using EffectiveConfig and RemoteConfigStatus fields).
// via AgentToServer message (using EffectiveConfig and RemoteConfigStatus fields).
//
// Only one OnRemoteConfig call can be active at any time. Until OnRemoteConfig
// returns it will not be called again. Any other remote configs received from
Expand Down Expand Up @@ -76,7 +76,7 @@ type Callbacks interface {
// The Agent should process the offer and return an error if the Agent does not
// want to accept the settings (e.g. if the TSL certificate in the settings
// cannot be verified). The returned error will be reported back to the Server
// via StatusReport message (using ConnectionStatuses field).
// via AgentToServer message (using ConnectionStatuses field).
//
// If OnOpampConnectionSettings returns nil and then the caller will
// attempt to reconnect to the OpAMP Server using the new settings.
Expand Down Expand Up @@ -108,7 +108,7 @@ type Callbacks interface {
// The Agent should process the settings and return an error if the Agent does not
// want to accept the settings (e.g. if the TSL certificate in the settings
// cannot be verified). The returned error will be reported back to the Server
// via StatusReport message (using ConnectionStatuses field).
// via AgentToServer message (using ConnectionStatuses field).
// If the Agent accepts the settings it should return nil and begin sending
// its own telemetry to the destination specified in the settings.
// We currently support 3 types of Agent's own telemetry: metrics, traces, logs.
Expand All @@ -127,7 +127,7 @@ type Callbacks interface {
// The Agent should process the settings and return an error if the Agent does not
// want to accept the settings (e.g. if the TSL certificate in the settings
// cannot be verified). The returned error will be reported back to the Server
// via StatusReport message (using ConnectionStatuses field).
// via AgentToServer message (using ConnectionStatuses field).
OnOtherConnectionSettings(
ctx context.Context,
name string,
Expand Down
Loading

0 comments on commit afb132b

Please sign in to comment.