Skip to content

Commit

Permalink
Fix component control protocol to allow checkin to be chunked across …
Browse files Browse the repository at this point in the history
…multiple messages (#3884)

* Add chunking support to agent.

* Add chunking test.

* Add changelog.

* Update to v7.6.0 client and fixes from code review.

* Updates from code review. Improve test.

* Update to elastic-agent-client v7.7.0. Remove usage of Version from component and update to StartupInfo.

* Run mage check.

* Update to v7.8.0 for elastic-agent-client.

* Fix unit test on Windows.

---------

Co-authored-by: Pierre HILBERT <pierre.hilbert@elastic.co>
  • Loading branch information
blakerouse and pierrehilbert authored Jan 18, 2024
1 parent 1e06443 commit caaf078
Show file tree
Hide file tree
Showing 23 changed files with 540 additions and 359 deletions.
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -933,11 +933,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-a

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-client/v7
Version: v7.5.0
Version: v7.8.0
Licence type (autodetected): Elastic
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.5.0/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.8.0/LICENSE.txt:

ELASTIC LICENSE AGREEMENT

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix component control protocol to allow checkin to be chunked across multiple messages

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/3884

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/2460
3 changes: 1 addition & 2 deletions control_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ message ComponentUnitState {
message ComponentVersionInfo {
// Name of the component.
string name = 1;
// Version of the component.
string version = 2;
// 2 reserved - used to be used for version of component.
// Extra meta information about the version.
map<string, string> meta = 3;
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5
github.com/elastic/e2e-testing v1.1.0
github.com/elastic/elastic-agent-autodiscover v0.6.6
github.com/elastic/elastic-agent-client/v7 v7.5.0
github.com/elastic/elastic-agent-client/v7 v7.8.0
github.com/elastic/elastic-agent-libs v0.7.3
github.com/elastic/elastic-agent-system-metrics v0.9.1
github.com/elastic/elastic-transport-go/v8 v8.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -798,8 +798,8 @@ github.com/elastic/e2e-testing v1.1.0 h1:Y+K215EWkf3ojAWmBK2JrxH/rITjkKM1zR8mnwI
github.com/elastic/e2e-testing v1.1.0/go.mod h1:8q2d8dmwavJXISowwaoreHFBnbR/uK4qanfRGhC/W9A=
github.com/elastic/elastic-agent-autodiscover v0.6.6 h1:P1y0dDpbhJc7Uw/xe85irPEad4Vljygc+y4iSxtqW7A=
github.com/elastic/elastic-agent-autodiscover v0.6.6/go.mod h1:chulyCAyZb/njMHgzkhC/yWnt8v/Y6eCRUhmFVnsA5o=
github.com/elastic/elastic-agent-client/v7 v7.5.0 h1:niI3WQ+01Lnp2r5LxK8SyNhrPJe13vBiOkqrDRK2oTA=
github.com/elastic/elastic-agent-client/v7 v7.5.0/go.mod h1:DYoX95xjC4BW/p2avyu724Qr2+hoUIz9eCU9CVS1d+0=
github.com/elastic/elastic-agent-client/v7 v7.8.0 h1:GHFzDJIWpdgI0qDk5EcqbQJGvwTsl2E2vQK3/xe+MYQ=
github.com/elastic/elastic-agent-client/v7 v7.8.0/go.mod h1:ihtjqJzYiIltlRhNruaSSc0ogxIhqPD5hOMKq16cI1s=
github.com/elastic/elastic-agent-libs v0.7.3 h1:tc6JDXYR+2XFMHJVv+7+M0OwAbZPxm3caLJEd943dlE=
github.com/elastic/elastic-agent-libs v0.7.3/go.mod h1:9hlSaDPm0XTrUWrZjwvckgov1pDHnsGyybzAjNe/1wA=
github.com/elastic/elastic-agent-system-metrics v0.9.1 h1:r0ofKHgPpl+W09ie7tzGcCDC0d4NZbQUv37rSgHf4FM=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,7 @@ func TestDiagnosticState(t *testing.T) {
State: client.UnitStateDegraded,
Message: "degraded message",
VersionInfo: runtime.ComponentVersionInfo{
Name: "version name",
Version: "version value",
Name: "version name",
},
},
},
Expand Down Expand Up @@ -462,7 +461,6 @@ components:
units: {}
version_info:
name: "version name"
version: "version value"
upgrade_details:
target_version: 8.12.0
state: UPG_DOWNLOADING
Expand Down Expand Up @@ -505,8 +503,7 @@ func TestDiagnosticStateForAPM(t *testing.T) {
State: client.UnitStateDegraded,
Message: "degraded message",
VersionInfo: runtime.ComponentVersionInfo{
Name: "version name",
Version: "version value",
Name: "version name",
},
Component: &proto.Component{
ApmConfig: &proto.APMConfig{
Expand Down Expand Up @@ -543,7 +540,6 @@ components:
units: {}
version_info:
name: "version name"
version: "version value"
component:
apmconfig:
elastic:
Expand Down
14 changes: 8 additions & 6 deletions internal/pkg/agent/configuration/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ import "fmt"

// GRPCConfig is a configuration of GRPC server.
type GRPCConfig struct {
Address string `config:"address"`
Port uint16 `config:"port"`
MaxMsgSize int `config:"max_message_size"`
Address string `config:"address"`
Port uint16 `config:"port"`
MaxMsgSize int `config:"max_message_size"`
CheckinChunkingDisabled bool `config:"checkin_chunking_disabled"`
}

// DefaultGRPCConfig creates a default server configuration.
func DefaultGRPCConfig() *GRPCConfig {
return &GRPCConfig{
Address: "localhost",
Port: 6789,
MaxMsgSize: 1024 * 1024 * 100, // grpc default 4MB is unsufficient for diagnostics
Address: "localhost",
Port: 6789,
MaxMsgSize: 1024 * 1024 * 100, // grpc default 4MB is unsufficient for diagnostics
CheckinChunkingDisabled: false, // on by default
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/diagnostics/diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

func TestRedactResults(t *testing.T) {
privKey := `-----BEGIN OPENSSH PRIVATE KEY-----
privKey := `-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAACFwAAAAdzc2gtcn
NhAAAAAwEAAQAAAgEAnomdLTF3Vp52cT55PealM+qwSQVEkEBsKB3dSXEIvCqvOmDRZic6
mjhkShOBr6nHUzhdHiNlMTNUaU0AxyuMofFnCCBVhnnC9w+CnTrL+lbVXmMClTrbqIGT8g
Expand Down Expand Up @@ -126,7 +126,7 @@ sequence:
- one
- two
mapping:
? sky
? sky
: blue
? sea : green`

Expand All @@ -149,7 +149,7 @@ func TestUnitAndStateMapping(t *testing.T) {
{UnitType: agentclient.UnitTypeInput, UnitID: "test-unit"}: {Message: "test unit"},
{UnitType: agentclient.UnitTypeOutput, UnitID: "test-unit-2"}: {Message: "test unit 2"},
},
VersionInfo: agentruntime.ComponentVersionInfo{Name: "test-component", Version: "0"},
VersionInfo: agentruntime.ComponentVersionInfo{Name: "test-component"},
}

formatted, err := yaml.Marshal(exampleState)
Expand Down
3 changes: 1 addition & 2 deletions pkg/component/fake/component/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func main() {
func run() error {
logger := zerolog.New(os.Stderr).Level(zerolog.TraceLevel).With().Timestamp().Logger()
ver := client.VersionInfo{
Name: comp.Fake,
Version: "1.0",
Name: comp.Fake,
Meta: map[string]string{
"input": comp.Fake,
},
Expand Down
3 changes: 1 addition & 2 deletions pkg/component/fake/shipper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func main() {
func run() error {
logger := zerolog.New(os.Stderr).With().Timestamp().Logger()
ver := client.VersionInfo{
Name: fakeShipper,
Version: "1.0",
Name: fakeShipper,
Meta: map[string]string{
"shipper": fakeShipper,
},
Expand Down
20 changes: 10 additions & 10 deletions pkg/component/runtime/conn_info_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
)

type mockCommunicator struct {
ch chan *proto.CheckinObserved
connInfo *proto.ConnInfo
ch chan *proto.CheckinObserved
startupInfo *proto.StartUpInfo
}

func newMockCommunicator() *mockCommunicator {
return &mockCommunicator{
ch: make(chan *proto.CheckinObserved, 1),
connInfo: &proto.ConnInfo{
startupInfo: &proto.StartUpInfo{
Addr: getAddress(),
ServerName: "endpoint",
Token: "some token",
Expand All @@ -45,7 +45,7 @@ func newMockCommunicator() *mockCommunicator {
}

func (c *mockCommunicator) WriteConnInfo(w io.Writer, services ...client.Service) error {
infoBytes, err := protobuf.Marshal(c.connInfo)
infoBytes, err := protobuf.Marshal(c.startupInfo)
if err != nil {
return fmt.Errorf("failed to marshal connection information: %w", err)
}
Expand Down Expand Up @@ -104,14 +104,14 @@ func TestConnInfoNormal(t *testing.T) {
t.Fatal(err)
}

var connInfo proto.ConnInfo
err = protobuf.Unmarshal(b, &connInfo)
var startupInfo proto.StartUpInfo
err = protobuf.Unmarshal(b, &startupInfo)
if err != nil {
t.Fatal(err)
}

// Check the received result
diff := cmp.Diff(&connInfo, comm.connInfo, cmpopts.IgnoreUnexported(proto.ConnInfo{}))
diff := cmp.Diff(&startupInfo, comm.startupInfo, cmpopts.IgnoreUnexported(proto.StartUpInfo{}))
if diff != "" {
t.Error(diff)
}
Expand Down Expand Up @@ -158,14 +158,14 @@ func TestConnInfoConnCloseThenAnotherConn(t *testing.T) {
t.Fatal(err)
}

var connInfo proto.ConnInfo
err = protobuf.Unmarshal(b, &connInfo)
var startupInfo proto.StartUpInfo
err = protobuf.Unmarshal(b, &startupInfo)
if err != nil {
t.Fatal(err)
}

// Check the received result
diff := cmp.Diff(&connInfo, comm.connInfo, cmpopts.IgnoreUnexported(proto.ConnInfo{}))
diff := cmp.Diff(&startupInfo, comm.startupInfo, cmpopts.IgnoreUnexported(proto.StartUpInfo{}))
if diff != "" {
t.Error(diff)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,25 @@ func (m *Manager) CheckinV2(server proto.ElasticAgent_CheckinV2Server) error {
return status.Error(codes.PermissionDenied, "invalid token")
}

// enable chunking with the communicator if the initial checkin
// states that it supports chunking
runtime.comm.chunkingAllowed = false
for _, support := range initCheckin.Supports {
if support == proto.ConnectionSupports_CheckinChunking {
runtime.comm.chunkingAllowed = true
break
}
}
if runtime.comm.chunkingAllowed {
if m.grpcConfig.CheckinChunkingDisabled {
// chunking explicitly disabled
runtime.comm.chunkingAllowed = false
runtime.logger.Warn("control checkin v2 protocol supports chunking, but chunking was explicitly disabled")
} else {
runtime.logger.Info("control checkin v2 protocol has chunking enabled")
}
}

return runtime.comm.checkin(server, initCheckin)
}

Expand Down
Loading

0 comments on commit caaf078

Please sign in to comment.