Skip to content

Commit

Permalink
Add basic Agent Health reporting (#92)
Browse files Browse the repository at this point in the history
Implements spec change: open-telemetry/opamp-spec#103
  • Loading branch information
tigrannajaryan authored Jul 5, 2022
1 parent aad0403 commit 84acca9
Show file tree
Hide file tree
Showing 12 changed files with 798 additions and 474 deletions.
8 changes: 8 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type OpAMPClient interface {
// AgentDescription returns the last value successfully set by SetAgentDescription().
AgentDescription() *protobufs.AgentDescription

// SetHealth sets the health status of the Agent. The AgentHealth will be included
// in the next status report sent to the Server. MAY be called before or after Start().
// May be also called after Start().
// May be also called from OnMessage handler.
//
// nil health parameter is not allowed and will return an error.
SetHealth(health *protobufs.AgentHealth) error

// UpdateEffectiveConfig fetches the current local effective config using
// GetEffectiveConfig callback and sends it to the Server.
// May be called anytime after Start(), including from OnMessage handler.
Expand Down
71 changes: 71 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,77 @@ func TestReportAgentDescription(t *testing.T) {
})
}

func TestReportAgentHealth(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
}
prepareClient(t, &settings, client)

assert.Error(t, client.SetHealth(nil))

sendHealth := &protobufs.AgentHealth{
Up: true,
StartTimeUnixNano: 123,
LastError: "bad error",
}
assert.NoError(t, client.SetHealth(sendHealth))

// Client --->
assert.NoError(t, client.Start(context.Background(), settings))

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// The first status report after Start must have the Health.
assert.True(t, proto.Equal(sendHealth, msg.Health))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
})

// Client --->
// Trigger a status report.
_ = client.UpdateEffectiveConfig(context.Background())

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must have compressed Health.
assert.Nil(t, msg.Health)

assert.EqualValues(t, 1, msg.SequenceNum)

// Ask client for full AgentDescription.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Flags: protobufs.ServerToAgent_ReportFullState,
}
})

// Server has requested the client to report, so there will be another message
// coming to the Server.
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 2, msg.SequenceNum)
// The status report must again have full Health
// because the Server asked for it.
assert.True(t, proto.Equal(sendHealth, msg.Health))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
})

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}

func TestReportEffectiveConfig(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

Expand Down
4 changes: 4 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (c *httpClient) SetAgentDescription(descr *protobufs.AgentDescription) erro
return c.common.SetAgentDescription(descr)
}

func (c *httpClient) SetHealth(health *protobufs.AgentHealth) error {
return c.common.SetHealth(health)
}

func (c *httpClient) UpdateEffectiveConfig(ctx context.Context) error {
return c.common.UpdateEffectiveConfig(ctx)
}
Expand Down
18 changes: 18 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
var (
ErrAgentDescriptionMissing = errors.New("AgentDescription is nil")
ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined")
ErrAgentHealthMissing = errors.New("AgentHealth is nil")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
Expand Down Expand Up @@ -213,6 +214,23 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er
return nil
}

// SetHealth sends a status update to the Server with the new AgentHealth
// and remembers the AgentHealth in the client state so that it can be sent
// to the Server when the Server asks for it.
func (c *ClientCommon) SetHealth(health *protobufs.AgentHealth) error {
// store the AgentHealth to send on reconnect
if err := c.ClientSyncedState.SetHealth(health); err != nil {
return err
}
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.Health = c.ClientSyncedState.Health()
},
)
c.sender.ScheduleSend()
return nil
}

// UpdateEffectiveConfig fetches the current local effective config using
// GetEffectiveConfig callback and sends it to the Server using provided Sender.
func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error {
Expand Down
21 changes: 21 additions & 0 deletions client/internal/clientstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ClientSyncedState struct {
mutex sync.Mutex

agentDescription *protobufs.AgentDescription
health *protobufs.AgentHealth
remoteConfigStatus *protobufs.RemoteConfigStatus
packageStatuses *protobufs.PackageStatuses
}
Expand All @@ -44,6 +45,12 @@ func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription {
return s.agentDescription
}

func (s *ClientSyncedState) Health() *protobufs.AgentHealth {
defer s.mutex.Unlock()
s.mutex.Lock()
return s.health
}

func (s *ClientSyncedState) RemoteConfigStatus() *protobufs.RemoteConfigStatus {
defer s.mutex.Unlock()
s.mutex.Lock()
Expand Down Expand Up @@ -75,6 +82,20 @@ func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescriptio
return nil
}

func (s *ClientSyncedState) SetHealth(health *protobufs.AgentHealth) error {
if health == nil {
return ErrAgentHealthMissing
}

clone := proto.Clone(health).(*protobufs.AgentHealth)

defer s.mutex.Unlock()
s.mutex.Lock()
s.health = clone

return nil
}

// SetRemoteConfigStatus sets the RemoteConfigStatus in the state.
func (s *ClientSyncedState) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error {
if status == nil {
Expand Down
3 changes: 2 additions & 1 deletion client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ func (r *receivedProcessor) rcvFlags(
r.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AgentDescription = r.clientSyncedState.AgentDescription()
msg.Health = r.clientSyncedState.Health()
msg.RemoteConfigStatus = r.clientSyncedState.RemoteConfigStatus()
msg.PackageStatuses = r.clientSyncedState.PackageStatuses()

// The logic for EffectiveConfig is similar to the previous 3 messages however
// The logic for EffectiveConfig is similar to the previous 4 sub-messages however
// the EffectiveConfig is fetched using GetEffectiveConfig instead of
// from clientSyncedState. We do this to avoid keeping EffectiveConfig in-memory.
msg.EffectiveConfig = cfg
Expand Down
4 changes: 4 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (c *wsClient) SetAgentDescription(descr *protobufs.AgentDescription) error
return c.common.SetAgentDescription(descr)
}

func (c *wsClient) SetHealth(health *protobufs.AgentHealth) error {
return c.common.SetHealth(health)
}

func (c *wsClient) UpdateEffectiveConfig(ctx context.Context) error {
return c.common.UpdateEffectiveConfig(ctx)
}
Expand Down
43 changes: 39 additions & 4 deletions internal/examples/server/data/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/sha256"
"sync"
"time"

"google.golang.org/protobuf/proto"

Expand All @@ -30,6 +31,9 @@ type Agent struct {
// Agent's current status.
Status *protobufs.AgentToServer

// The time when the agent has started. Valid only if Status.Health.Up==true
StartedAt time.Time

// Effective config reported by the Agent.
EffectiveConfig string

Expand Down Expand Up @@ -62,6 +66,7 @@ func (agent *Agent) CloneReadonly() *Agent {
EffectiveConfig: agent.EffectiveConfig,
CustomInstanceConfig: agent.CustomInstanceConfig,
remoteConfig: proto.Clone(agent.remoteConfig).(*protobufs.AgentRemoteConfig),
StartedAt: agent.StartedAt,
}
}

Expand Down Expand Up @@ -95,7 +100,7 @@ func notifyStatusWatchers(statusUpdateWatchers []chan<- struct{}) {
}
}

func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
prevStatus := agent.Status

if agent.Status == nil {
Expand All @@ -113,9 +118,7 @@ func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agent
// (or this is the first report).
// Make full comparison of previous and new descriptions to see if it
// really is different.
if prevStatus != nil && proto.Equal(
prevStatus.AgentDescription, newStatus.AgentDescription,
) {
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
// Agent description didn't change.
agentDescrChanged = false
} else {
Expand All @@ -134,6 +137,38 @@ func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agent
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
}
}
return agentDescrChanged
}

func (agent *Agent) updateHealth(newStatus *protobufs.AgentToServer) {
if newStatus.Health == nil {
return
}

agent.Status.Health = newStatus.Health

if agent.Status != nil && agent.Status.Health != nil && agent.Status.Health.Up {
agent.StartedAt = time.Unix(0, int64(agent.Status.Health.StartTimeUnixNano)).UTC()
}
}

func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer) {
// Update remote config status if it is included and is different from what we have.
if newStatus.RemoteConfigStatus != nil {
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
}
}

func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
if agent.Status == nil {
// First time this Agent reports a status, remember it.
agent.Status = newStatus
agentDescrChanged = true
}

agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
agent.updateRemoteConfigStatus(newStatus)
agent.updateHealth(newStatus)

return agentDescrChanged
}
Expand Down
15 changes: 15 additions & 0 deletions internal/examples/server/uisrv/html/agent.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,22 @@ <h3>Agent</h3>
<table border="1" style="border-collapse: collapse">
<tr>
<td>Instance ID:</td><td>{{ .InstanceId }}</td>
</tr>
{{if .Status.Health }}
<tr>
<td>Up:</td><td>{{ .Status.Health.Up }}</td>
</tr>
{{if .Status.Health.LastError }}
<tr>
<td></td><td><span style="color:red">{{ .Status.Health.LastError }}</span></td>
</tr>
{{end}}
{{if .Status.Health.Up }}
<tr>
<td>Up since:</td><td>{{ .StartedAt }}</td>
</tr>
{{end}}
{{end}}
</table>
</td>
<td valign="top">
Expand Down
32 changes: 28 additions & 4 deletions internal/examples/supervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Supervisor struct {
// Commander that starts/stops the Agent process.
commander *commander.Commander

startedAt time.Time

// Supervisor's own config.
config config.Supervisor

Expand Down Expand Up @@ -94,7 +96,7 @@ func NewSupervisor(logger types.Logger) (*Supervisor, error) {
s.loadAgentEffectiveConfig()

if err := s.startOpAMP(); err != nil {
return nil, fmt.Errorf("Cannot startOpAMP OpAMP client: %v", err)
return nil, fmt.Errorf("Cannot start OpAMP client: %v", err)
}

var err error
Expand Down Expand Up @@ -368,8 +370,18 @@ func (s *Supervisor) recalcEffectiveConfig() (configChanged bool, err error) {
func (s *Supervisor) startAgent() {
err := s.commander.Start(context.Background())
if err != nil {
s.logger.Errorf("Cannot startOpAMP the agent: %v", err)
errMsg := fmt.Sprintf("Cannot start the agent: %v", err)
s.logger.Errorf(errMsg)
s.opampClient.SetHealth(&protobufs.AgentHealth{Up: false, LastError: errMsg})
return
}
s.startedAt = time.Now()
s.opampClient.SetHealth(
&protobufs.AgentHealth{
Up: true,
StartTimeUnixNano: uint64(s.startedAt.UnixNano()),
},
)
}

func (s *Supervisor) runAgentProcess() {
Expand All @@ -388,8 +400,12 @@ func (s *Supervisor) runAgentProcess() {
s.applyConfigWithAgentRestart()

case <-s.commander.Done():
s.logger.Debugf("Agent process PID=%d exited unexpectedly, exit code=%d. Will restart in a bit...",
s.commander.Pid(), s.commander.ExitCode())
errMsg := fmt.Sprintf(
"Agent process PID=%d exited unexpectedly, exit code=%d. Will restart in a bit...",
s.commander.Pid(), s.commander.ExitCode(),
)
s.logger.Debugf(errMsg)
s.opampClient.SetHealth(&protobufs.AgentHealth{Up: false, LastError: errMsg})

// TODO: decide why the agent stopped. If it was due to bad config, report it to server.

Expand Down Expand Up @@ -423,7 +439,15 @@ func (s *Supervisor) writeEffectiveConfigToFile(cfg string, filePath string) {

func (s *Supervisor) Shutdown() {
s.logger.Debugf("Supervisor shutting down...")
if s.commander != nil {
s.commander.Stop(context.Background())
}
if s.opampClient != nil {
s.opampClient.SetHealth(
&protobufs.AgentHealth{
Up: false, LastError: "Supervisor is shutdown",
},
)
_ = s.opampClient.Stop(context.Background())
}
}
Expand Down
Loading

0 comments on commit 84acca9

Please sign in to comment.