diff --git a/.buildkite/pipeline.elastic-agent-package.yml b/.buildkite/pipeline.elastic-agent-package.yml index eb53280b0fd..25c538f25f9 100644 --- a/.buildkite/pipeline.elastic-agent-package.yml +++ b/.buildkite/pipeline.elastic-agent-package.yml @@ -134,6 +134,8 @@ steps: depends_on: package agents: provider: "gcp" + machineType: "n2-standard-8" + diskSizeGb: 250 env: DRA_PROJECT_ID: "elastic-agent-package" DRA_PROJECT_ARTIFACT_ID: "agent-package" diff --git a/.buildkite/scripts/steps/integration_tests.sh b/.buildkite/scripts/steps/integration_tests.sh index 4143e2413e5..4936c62e6b6 100755 --- a/.buildkite/scripts/steps/integration_tests.sh +++ b/.buildkite/scripts/steps/integration_tests.sh @@ -11,7 +11,7 @@ MAGE_SUBTARGET="${3:-""}" # Override the agent package version using a string with format .. # NOTE: use only after version bump when the new version is not yet available, for example: # OVERRIDE_AGENT_PACKAGE_VERSION="8.10.3" otherwise OVERRIDE_AGENT_PACKAGE_VERSION="". -OVERRIDE_AGENT_PACKAGE_VERSION="" +OVERRIDE_AGENT_PACKAGE_VERSION="8.14.0" if [[ -n "$OVERRIDE_AGENT_PACKAGE_VERSION" ]]; then OVERRIDE_TEST_AGENT_VERSION=${OVERRIDE_AGENT_PACKAGE_VERSION}"-SNAPSHOT" diff --git a/.mergify.yml b/.mergify.yml index 6da091c7e2a..ddfee7828e5 100644 --- a/.mergify.yml +++ b/.mergify.yml @@ -306,3 +306,16 @@ pull_request_rules: labels: - "backport" title: "[{{ destination_branch }}](backport #{{ number }}) {{ title }}" + - name: backport patches to 8.14 branch + conditions: + - merged + - label=backport-v8.14.0 + actions: + backport: + assignees: + - "{{ author }}" + branches: + - "8.14" + labels: + - "backport" + title: "[{{ destination_branch }}](backport #{{ number }}) {{ title }}" diff --git a/changelog/fragments/1711355973-Introduce-non-grouping-for-input-spec.yaml b/changelog/fragments/1711355973-Introduce-non-grouping-for-input-spec.yaml new file mode 100644 index 00000000000..371b83edbba --- /dev/null +++ b/changelog/fragments/1711355973-Introduce-non-grouping-for-input-spec.yaml @@ -0,0 +1,34 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Introduce isolate units for input spec. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Introduce a new flag in the spec that will allow to disable grouping of same input type inputs into a single component. + If that flag is being activated the inputs won't be grouped and we will create a separate component for each input that will run units for that particular input. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/4476 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/security-team/issues/8669 diff --git a/changelog/fragments/1712583231-leader-election-issue.yaml b/changelog/fragments/1712583231-leader-election-issue.yaml new file mode 100644 index 00000000000..d5980be668f --- /dev/null +++ b/changelog/fragments/1712583231-leader-election-issue.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Fix issue where kubernetes_leaderelection provider would not try to reacquire the lease once lost + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/4542 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/4543 diff --git a/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml b/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml index 3a9d2e6907c..8401e6a97d3 100644 --- a/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml +++ b/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml @@ -30,7 +30,7 @@ spec: dnsPolicy: ClusterFirstWithHostNet containers: - name: elastic-agent - image: docker.elastic.co/beats/elastic-agent:8.14.0 + image: docker.elastic.co/beats/elastic-agent:8.15.0 env: # Set to 1 for enrollment into Fleet server. If not set, Elastic Agent is run in standalone mode - name: FLEET_ENROLL diff --git a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml index cb1f5802eac..a551a6f1d74 100644 --- a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml +++ b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml @@ -698,13 +698,13 @@ spec: # - -c # - >- # mkdir -p /etc/elastic-agent/inputs.d && - # wget -O - https://github.com/elastic/elastic-agent/archive/8.14.tar.gz | tar xz -C /etc/elastic-agent/inputs.d --strip=5 "elastic-agent-8.14/deploy/kubernetes/elastic-agent-standalone/templates.d" + # wget -O - https://github.com/elastic/elastic-agent/archive/8.15.tar.gz | tar xz -C /etc/elastic-agent/inputs.d --strip=5 "elastic-agent-8.15/deploy/kubernetes/elastic-agent-standalone/templates.d" # volumeMounts: # - name: external-inputs # mountPath: /etc/elastic-agent/inputs.d containers: - name: elastic-agent-standalone - image: docker.elastic.co/beats/elastic-agent:8.14.0 + image: docker.elastic.co/beats/elastic-agent:8.15.0 args: ["-c", "/etc/elastic-agent/agent.yml", "-e"] env: # The basic authentication username used to connect to Elasticsearch diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 3cfeddb32af..18c2c3496c3 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -61,6 +61,19 @@ var ( }, }, } + fakeIsolatedUnitsInputSpec = component.InputSpec{ + Name: "fake-isolated-units", + Platforms: []string{fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH)}, + Shippers: []string{"fake-shipper"}, + Command: &component.CommandSpec{ + Timeouts: component.CommandTimeoutSpec{ + Checkin: 30 * time.Second, + Restart: 10 * time.Millisecond, // quick restart during tests + Stop: 30 * time.Second, + }, + }, + IsolateUnits: true, + } fakeShipperSpec = component.ShipperSpec{ Name: "fake-shipper", Platforms: []string{fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH)}, @@ -547,6 +560,94 @@ func TestCoordinator_StateSubscribe(t *testing.T) { require.NoError(t, err) } +func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) { + coordCh := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + coord, cfgMgr, varsMgr := createCoordinator(t, ctx, WithComponentInputSpec(fakeIsolatedUnitsInputSpec)) + go func() { + err := coord.Run(ctx) + if errors.Is(err, context.Canceled) { + // allowed error + err = nil + } + coordCh <- err + }() + + resultChan := make(chan error) + go func() { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + subChan := coord.StateSubscribe(ctx, 32) + for { + select { + case <-ctx.Done(): + resultChan <- ctx.Err() + return + case state := <-subChan: + t.Logf("%+v", state) + if len(state.Components) == 3 { + compState0 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-0") + compState1 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-1") + if compState0 != nil && compState1 != nil { + unit0, ok0 := compState0.State.Units[runtime.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-0-unit"}] + unit1, ok1 := compState1.State.Units[runtime.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-1-unit"}] + if ok0 && ok1 { + if (unit0.State == client.UnitStateHealthy && unit0.Message == "Healthy From Fake Isolated Units 0 Config") && + (unit1.State == client.UnitStateHealthy && unit1.Message == "Healthy From Fake Isolated Units 1 Config") { + resultChan <- nil + return + } + } + } + } + } + } + }() + + // no vars used by the config + varsMgr.Vars(ctx, []*transpiler.Vars{{}}) + + // set the configuration to run a fake input + cfg, err := config.NewConfigFrom(map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "fake-action-output", + "shipper": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "id": "fake-isolated-units-0", + "type": "fake-isolated-units", + "use_output": "default", + "state": client.UnitStateHealthy, + "message": "Healthy From Fake Isolated Units 0 Config", + }, + map[string]interface{}{ + "id": "fake-isolated-units-1", + "type": "fake-isolated-units", + "use_output": "default", + "state": client.UnitStateHealthy, + "message": "Healthy From Fake Isolated Units 1 Config", + }, + }, + }) + require.NoError(t, err) + cfgMgr.Config(ctx, cfg) + + err = <-resultChan + require.NoError(t, err) + cancel() + + err = <-coordCh + require.NoError(t, err) +} + func TestCollectManagerErrorsTimeout(t *testing.T) { handlerChan, _, _, _, _ := setupManagerShutdownChannels(time.Millisecond) // Don't send anything to the shutdown channels, causing a timeout @@ -757,6 +858,7 @@ func TestCoordinator_UpgradeDetails(t *testing.T) { type createCoordinatorOpts struct { managed bool upgradeManager UpgradeManager + compInputSpec component.InputSpec } type CoordinatorOpt func(o *createCoordinatorOpts) @@ -773,13 +875,21 @@ func WithUpgradeManager(upgradeManager UpgradeManager) CoordinatorOpt { } } +func WithComponentInputSpec(spec component.InputSpec) CoordinatorOpt { + return func(o *createCoordinatorOpts) { + o.compInputSpec = spec + } +} + // createCoordinator creates a coordinator that using a fake config manager and a fake vars manager. // // The runtime specifications is set up to use both the fake component and fake shipper. func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt) (*Coordinator, *fakeConfigManager, *fakeVarsManager) { t.Helper() - o := &createCoordinatorOpts{} + o := &createCoordinatorOpts{ + compInputSpec: fakeInputSpec, + } for _, opt := range opts { opt(o) } @@ -793,7 +903,7 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt InputType: "fake", BinaryName: "", BinaryPath: testBinary(t, "component"), - Spec: fakeInputSpec, + Spec: o.compInputSpec, } shipperSpec := component.ShipperRuntimeSpec{ ShipperType: "fake-shipper", diff --git a/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go b/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go index c32b57d78c8..056b0e28b0c 100644 --- a/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go +++ b/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go @@ -10,10 +10,12 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "github.com/elastic/elastic-agent-autodiscover/kubernetes" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/composable" @@ -22,6 +24,8 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) +const leaderElectorPrefix = "elastic-agent-leader-" + func init() { composable.Providers.MustAddContextProvider("kubernetes_leaderelection", ContextProviderBuilder) } @@ -45,11 +49,15 @@ func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed boo return &contextProvider{logger, &cfg, nil}, nil } +// This is needed to overwrite the Kubernetes client for the tests +var getK8sClientFunc = func(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { + return kubernetes.GetKubernetesClient(kubeconfig, opt) +} + // Run runs the leaderelection provider. func (p *contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { - client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig, p.config.KubeClientOptions) + client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions) if err != nil { - // info only; return nil (do nothing) p.logger.Debugf("Kubernetes leaderelection provider skipped, unable to connect: %s", err) return nil } @@ -61,9 +69,9 @@ func (p *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider var id string podName, found := os.LookupEnv("POD_NAME") if found { - id = "elastic-agent-leader-" + podName + id = leaderElectorPrefix + podName } else { - id = "elastic-agent-leader-" + agentInfo.AgentID() + id = leaderElectorPrefix + agentInfo.AgentID() } ns, err := kubernetes.InClusterNamespace() @@ -104,9 +112,14 @@ func (p *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider p.logger.Errorf("error while creating Leader Elector: %v", err) } p.logger.Debugf("Starting Leader Elector") - le.Run(comm) - p.logger.Debugf("Stopped Leader Elector") - return comm.Err() + + for { + le.Run(ctx) + if ctx.Err() != nil { + p.logger.Debugf("Stopped Leader Elector") + return comm.Err() + } + } } func (p *contextProvider) startLeading(comm corecomp.ContextProviderComm) { diff --git a/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection_test.go b/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection_test.go new file mode 100644 index 00000000000..6abb0a3e0e9 --- /dev/null +++ b/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection_test.go @@ -0,0 +1,229 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetesleaderelection + +import ( + "context" + "os" + "testing" + "time" + + autodiscoverK8s "github.com/elastic/elastic-agent-autodiscover/kubernetes" + + "github.com/stretchr/testify/require" + v1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + k8sfake "k8s.io/client-go/kubernetes/fake" + + "github.com/elastic/elastic-agent-libs/logp" + + ctesting "github.com/elastic/elastic-agent/internal/pkg/composable/testing" + "github.com/elastic/elastic-agent/internal/pkg/config" +) + +const namespace = "default" +const leaseName = "agent-lease-test" + +// createLease creates a new lease resource +func createLease() *v1.Lease { + lease := &v1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: namespace, + }, + } + return lease +} + +// applyLease applies the lease +func applyLease(client kubernetes.Interface, lease *v1.Lease, firstTime bool) error { + var err error + if firstTime { + _, err = client.CoordinationV1().Leases(namespace).Create(context.Background(), lease, metav1.CreateOptions{}) + return err + } + _, err = client.CoordinationV1().Leases(namespace).Update(context.Background(), lease, metav1.UpdateOptions{}) + return err +} + +// getLeaseHolder returns the holder identity of the lease +func getLeaseHolder(client kubernetes.Interface) (string, error) { + lease, err := client.CoordinationV1().Leases(namespace).Get(context.Background(), leaseName, metav1.GetOptions{}) + if err != nil { + return "", err + } + holder := lease.Spec.HolderIdentity + if holder == nil { + return "", err + } + return *holder, nil +} + +// TestNewLeaderElectionManager will test the leader elector. +// We will try to check if an instance can acquire the lease more than one time. This way, we will know that +// the leader elector starts running again after it has stopped - which happens once a leader loses the lease. +// To make sure that happens we will do the following: +// 1. We will create the lease to be used by the leader elector. +// 2. We will create two context providers - in the default context, this would mean two nodes, each one with an agent running. +// We will wait for one of the agents, agent1, to acquire the lease, before starting the other. +// 3. We force the lease to be acquired by the other agent, agent2. +// 4. We will force the lease to be acquired by the agent1 again. To avoid the agent2 reacquiring it multiple times, +// we will stop this provider and make sure the agent1 can reacquire it. +func TestNewLeaderElectionManager(t *testing.T) { + client := k8sfake.NewSimpleClientset() + + lease := createLease() + // create the lease that leader election will be using + err := applyLease(client, lease, true) + require.NoError(t, err) + + // Create the provider + logger := logp.NewLogger("test_leaderelection") + + leaseDuration := 3 + leaseRenewDeadline := 2 + leaseRetryPeriod := 1 + + c := map[string]interface{}{ + "leader_lease": leaseName, + "leader_leaseduration": leaseDuration, + "leader_renewdeadline": leaseRenewDeadline, + "leader_retryperiod": leaseRetryPeriod, + } + cfg, err := config.NewConfigFrom(c) + require.NoError(t, err) + + getK8sClientFunc = func(kubeconfig string, opt autodiscoverK8s.KubeClientOptions) (kubernetes.Interface, error) { + return client, nil + } + require.NoError(t, err) + + podNames := [2]string{"agent1", "agent2"} + cancelFuncs := [2]context.CancelFunc{} + + done := make(chan int, 1) + + // Create two leader election providers representing two agents running + for i := 0; i < 2; i++ { + p, err := ContextProviderBuilder(logger, cfg, true) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + cancelFuncs[i] = cancel + defer cancel() + + comm := ctesting.NewContextComm(ctx) + + err = os.Setenv("POD_NAME", podNames[i]) + if err != nil { + require.FailNow(t, "Failed to set pod name environment variable.") + } + go func() { + _ = p.Run(ctx, comm) + }() + + if i == 1 { + break + } + + // We need to wait for the first agent to acquire the lease, so we can POD_NAME environment variable again + go func() { + expectedLeader := leaderElectorPrefix + podNames[i] + for { + holder, err := getLeaseHolder(client) + require.NoError(t, err) + + if holder == expectedLeader { + done <- 1 + break + } + } + }() + + select { + case <-done: + case <-time.After(time.Duration(leaseDuration+leaseRetryPeriod) * 30 * time.Second): + require.FailNow(t, "Timeout"+ + " while waiting for the first pod to acquire the lease. This should not happen. Consider increasing "+ + "the timeout.") + } + } + + go func() { + // At this point the current holder is agent1. Let's change it to agent2. + for { + // Force the lease to be applied again, so a new leader is elected. + intermediateHolder := "does-not-matter" + lease.Spec.HolderIdentity = &intermediateHolder + err = applyLease(client, lease, false) + require.NoError(t, err) + + var currentHolder string + for { + currentHolder, err = getLeaseHolder(client) + require.NoError(t, err) + + // In this case, we already have an agent as holder + if currentHolder == leaderElectorPrefix+podNames[0] || currentHolder == leaderElectorPrefix+podNames[1] { + break + } + } + + if currentHolder == leaderElectorPrefix+podNames[1] { + done <- 1 + break + } + } + }() + + select { + case <-done: + case <-time.After(time.Duration(leaseDuration+leaseRetryPeriod) * 30 * time.Second): + require.FailNow(t, "Timeout "+ + " while waiting for agent2 to acquire the lease. This should not happen. Consider increasing "+ + "the timeout.") + } + + // Now that the holder is agent2, let's wait for agent1 to be reelected. + // To avoid having to wait very long, the context of agent2 will be canceled so the leader elector will not be + // running anymore. This way there is only one instance fighting to acquire the lease. + cancelFuncs[1]() + go func() { + for { + // Force the lease to be applied again, so a new leader is elected. + intermediateHolder := "does-not-matter" + lease.Spec.HolderIdentity = &intermediateHolder + err = applyLease(client, lease, false) + require.NoError(t, err) + + var currentHolder string + for { + currentHolder, err = getLeaseHolder(client) + require.NoError(t, err) + + // In this case, we already have an agent as holder + if currentHolder == leaderElectorPrefix+podNames[0] || currentHolder == leaderElectorPrefix+podNames[1] { + break + } + } + + if currentHolder == leaderElectorPrefix+podNames[0] { + done <- 1 + break + } + } + }() + + select { + case <-done: + case <-time.After(time.Duration(leaseDuration+leaseRetryPeriod) * 30 * time.Second): + require.FailNow(t, "Timeout"+ + " while waiting for agent1 to reacquire the lease. This should not happen. Consider increasing "+ + "the timeout.") + } + + cancelFuncs[0]() +} diff --git a/pkg/component/component.go b/pkg/component/component.go index 80b54a8a82c..954fc373a13 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -351,18 +351,13 @@ func unitForShipperOutput(output outputI, id string, shipperType string) Unit { } } -// Collect all inputs of the given type going to the given output and return -// the resulting Component. The returned Component may have no units if no -// active inputs were found. -func (r *RuntimeSpecs) componentForInputType( - inputType string, +// createShipperReference creates a ShipperReference for the given output and input spec. +func (r *RuntimeSpecs) createShipperReference( output outputI, - featureFlags *features.Flags, - componentConfig *ComponentConfig, -) Component { - componentID := fmt.Sprintf("%s-%s", inputType, output.name) - - inputSpec, componentErr := r.GetInput(inputType) + inputSpec InputRuntimeSpec, + componentID string, + componentErr error, +) (*ShipperReference, error) { var shipperRef *ShipperReference if componentErr == nil { if output.shipperEnabled { @@ -393,40 +388,113 @@ func (r *RuntimeSpecs) componentForInputType( } } } + // If there's an error at this point we still proceed with assembling the // policy into a component, we just attach the error to its Err field to // indicate that it can't be run. + return shipperRef, componentErr +} - var units []Unit - for _, input := range output.inputs[inputType] { - if input.enabled { - unitID := fmt.Sprintf("%s-%s", componentID, input.id) - units = append(units, unitForInput(input, unitID)) +// populateOutputUnitsForInput adds the output units to the given slice. +func populateOutputUnitsForInput( + units *[]Unit, + output outputI, + componentID string, + componentErr error, + shipperRef *ShipperReference, +) { + if shipperRef != nil { + // Shipper units are skipped if componentErr isn't nil, because in that + // case we generally don't have a valid shipper type to base it on. + if componentErr == nil { + *units = append(*units, unitForShipperOutput(output, componentID, shipperRef.ShipperType)) } + } else { + *units = append(*units, unitForOutput(output, componentID)) } - if len(units) > 0 { - if shipperRef != nil { - // Shipper units are skipped if componentErr isn't nil, because in that - // case we generally don't have a valid shipper type to base it on. - if componentErr == nil { - units = append(units, - unitForShipperOutput(output, componentID, shipperRef.ShipperType)) +} + +// Collect all inputs of the given type going to the given output and return +// the resulting Components. The returned Components may have no units if no +// active inputs were found. +func (r *RuntimeSpecs) componentsForInputType( + inputType string, + output outputI, + featureFlags *features.Flags, + componentConfig *ComponentConfig, +) []Component { + var components []Component + inputSpec, componentErr := r.GetInput(inputType) + + // Treat as non isolated units component on error of reading the input spec + if componentErr != nil || !inputSpec.Spec.IsolateUnits { + componentID := fmt.Sprintf("%s-%s", inputType, output.name) + shipperRef, componentErr := r.createShipperReference(output, inputSpec, componentID, componentErr) + + var units []Unit + for _, input := range output.inputs[inputType] { + if input.enabled { + unitID := fmt.Sprintf("%s-%s", componentID, input.id) + units = append(units, unitForInput(input, unitID)) } - } else { - units = append(units, unitForOutput(output, componentID)) + } + + if len(units) > 0 { + // Populate the output units for this component + populateOutputUnitsForInput( + &units, + output, + componentID, + componentErr, + shipperRef, + ) + } + + components = append(components, Component{ + ID: componentID, + Err: componentErr, + InputSpec: &inputSpec, + InputType: inputType, + OutputType: output.outputType, + Units: units, + Features: featureFlags.AsProto(), + Component: componentConfig.AsProto(), + ShipperRef: shipperRef, + }) + } else { + for _, input := range output.inputs[inputType] { + // Units are being mapped to components, so we need a unique ID for each. + componentID := fmt.Sprintf("%s-%s-%s", inputType, output.name, input.id) + shipperRef, componentErr := r.createShipperReference(output, inputSpec, componentID, componentErr) + + var units []Unit + if input.enabled { + unitID := fmt.Sprintf("%s-unit", componentID) + units = append(units, unitForInput(input, unitID)) + // Populate the output units for this component + populateOutputUnitsForInput( + &units, + output, + componentID, + componentErr, + shipperRef, + ) + } + + components = append(components, Component{ + ID: componentID, + Err: componentErr, + InputSpec: &inputSpec, + InputType: inputType, + OutputType: output.outputType, + Units: units, + Features: featureFlags.AsProto(), + Component: componentConfig.AsProto(), + ShipperRef: shipperRef, + }) } } - return Component{ - ID: componentID, - Err: componentErr, - InputSpec: &inputSpec, - InputType: inputType, - OutputType: output.outputType, - Units: units, - Features: featureFlags.AsProto(), - Component: componentConfig.AsProto(), - ShipperRef: shipperRef, - } + return components } func (r *RuntimeSpecs) componentsForOutput(output outputI, featureFlags *features.Flags, componentConfig *ComponentConfig) []Component { @@ -434,17 +502,19 @@ func (r *RuntimeSpecs) componentsForOutput(output outputI, featureFlags *feature shipperTypes := make(map[string]bool) for inputType := range output.inputs { // No need for error checking at this stage -- we are guaranteed - // to get a Component back. If there is an error that prevents it + // to get a Component/s back. If there is an error that prevents it/them // from running then it will be in the Component's Err field and - // we will report it later. The only thing we skip is a component + // we will report it later. The only thing we skip is a component/s // with no units. - component := r.componentForInputType(inputType, output, featureFlags, componentConfig) - if len(component.Units) > 0 { - if component.ShipperRef != nil { - // If this component uses a shipper, mark that shipper type as active - shipperTypes[component.ShipperRef.ShipperType] = true + typeComponents := r.componentsForInputType(inputType, output, featureFlags, componentConfig) + for _, component := range typeComponents { + if len(component.Units) > 0 { + if component.ShipperRef != nil { + // If this component uses a shipper, mark that shipper type as active + shipperTypes[component.ShipperRef.ShipperType] = true + } + components = append(components, component) } - components = append(components, component) } } diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index e734ab5de01..562ecc1450d 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -234,6 +234,27 @@ func TestToComponents(t *testing.T) { }, Err: `invalid 'inputs.1.id', has a duplicate id "filestream". Please add a unique value for the 'id' key to each input in the agent policy`, }, + { + Name: "Invalid: inputs entry duplicate because of missing id (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + }, + map[string]interface{}{ + "type": "cloudbeat", + }, + }, + }, + Err: `invalid 'inputs.1.id', has a duplicate id "cloudbeat". Please add a unique value for the 'id' key to each input in the agent policy`, + }, { Name: "Invalid: inputs entry id not a string", Platform: linuxAMD64Platform, @@ -253,6 +274,25 @@ func TestToComponents(t *testing.T) { }, Err: "invalid 'inputs.0.id', expected a string not a int", }, + { + Name: "Invalid: inputs entry id not a string (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": 0, + }, + }, + }, + Err: "invalid 'inputs.0.id', expected a string not a int", + }, { Name: "Invalid: inputs entry use_output not a string", Platform: linuxAMD64Platform, @@ -273,6 +313,26 @@ func TestToComponents(t *testing.T) { }, Err: "invalid 'inputs.0.use_output', expected a string not a int", }, + { + Name: "Invalid: inputs entry use_output not a string (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "use_output": 0, + }, + }, + }, + Err: "invalid 'inputs.0.use_output', expected a string not a int", + }, { Name: "Invalid: inputs entry use_output references unknown output", Platform: linuxAMD64Platform, @@ -293,6 +353,26 @@ func TestToComponents(t *testing.T) { }, Err: "invalid 'inputs.0.use_output', references an unknown output 'other'", }, + { + Name: "Invalid: inputs entry use_output references unknown output (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "use_output": "other", + }, + }, + }, + Err: "invalid 'inputs.0.use_output', references an unknown output 'other'", + }, { Name: "Invalid: inputs entry enabled not a bool", Platform: linuxAMD64Platform, @@ -314,6 +394,27 @@ func TestToComponents(t *testing.T) { }, Err: "invalid 'inputs.0.enabled', expected a bool not a string", }, + { + Name: "Invalid: inputs entry enabled not a bool (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "use_output": "default", + "enabled": "false", + }, + }, + }, + Err: "invalid 'inputs.0.enabled', expected a bool not a string", + }, { Name: "Invalid: inputs unknown type", Platform: linuxAMD64Platform, @@ -536,6 +637,94 @@ func TestToComponents(t *testing.T) { }, }, }, + { + Name: "Invalid: single input failed to decode into config (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "use_output": "default", + "enabled": true, + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-1", + "use_output": "default", + "enabled": true, + "meta": []interface{}{ + map[string]interface{}{ + "bad": "should not have been array of dicts", + }, + }, + }, + }, + }, + Result: []Component{ + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + ID: "cloudbeat-default-cloudbeat-0", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-0", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-0-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + }), + }, + }, + }, + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + ID: "cloudbeat-default-cloudbeat-1", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-1", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-1-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Err: errors.New("1 decoding error(s): 'meta' expected a map, got 'slice'"), + }, + }, + }, + }, + }, { Name: "Output disabled", Platform: linuxAMD64Platform, @@ -555,6 +744,25 @@ func TestToComponents(t *testing.T) { }, }, }, + { + Name: "Output disabled (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": false, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "enabled": true, + }, + }, + }, + }, { Name: "Input disabled", Platform: linuxAMD64Platform, @@ -574,6 +782,30 @@ func TestToComponents(t *testing.T) { }, }, }, + { + Name: "Input disabled (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-1", + "enabled": false, + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-2", + "enabled": false, + }, + }, + }, + }, { Name: "Simple representation", Platform: linuxAMD64Platform, @@ -629,9 +861,8 @@ func TestToComponents(t *testing.T) { }, }, { - Name: "Debug log level", + Name: "Simple representation (isolated units)", Platform: linuxAMD64Platform, - LogLevel: logp.DebugLevel, Policy: map[string]interface{}{ "outputs": map[string]interface{}{ "default": map[string]interface{}{ @@ -641,42 +872,42 @@ func TestToComponents(t *testing.T) { }, "inputs": []interface{}{ map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", + "type": "cloudbeat", + "id": "cloudbeat-0", "enabled": true, }, map[string]interface{}{ - "type": "filestream", - "id": "filestream-1", + "type": "cloudbeat", + "id": "cloudbeat-1", "enabled": false, }, }, }, Result: []Component{ { - InputType: "filestream", + InputType: "cloudbeat", OutputType: "elasticsearch", InputSpec: &InputRuntimeSpec{ - InputType: "filestream", - BinaryName: "filebeat", - BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, Units: []Unit{ { - ID: "filestream-default", + ID: "cloudbeat-default-cloudbeat-0", Type: client.UnitTypeOutput, - LogLevel: client.UnitLogLevelDebug, + LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ "type": "elasticsearch", }), }, { - ID: "filestream-default-filestream-0", + ID: "cloudbeat-default-cloudbeat-0-unit", Type: client.UnitTypeInput, - LogLevel: client.UnitLogLevelDebug, + LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", + "type": "cloudbeat", + "id": "cloudbeat-0", }), }, }, @@ -684,9 +915,9 @@ func TestToComponents(t *testing.T) { }, }, { - Name: "Unique log level", + Name: "Debug log level", Platform: linuxAMD64Platform, - LogLevel: logp.ErrorLevel, + LogLevel: logp.DebugLevel, Policy: map[string]interface{}{ "outputs": map[string]interface{}{ "default": map[string]interface{}{ @@ -696,10 +927,9 @@ func TestToComponents(t *testing.T) { }, "inputs": []interface{}{ map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", - "enabled": true, - "log_level": "debug", + "type": "filestream", + "id": "filestream-0", + "enabled": true, }, map[string]interface{}{ "type": "filestream", @@ -721,7 +951,7 @@ func TestToComponents(t *testing.T) { { ID: "filestream-default", Type: client.UnitTypeOutput, - LogLevel: client.UnitLogLevelError, + LogLevel: client.UnitLogLevelDebug, Config: MustExpectedConfig(map[string]interface{}{ "type": "elasticsearch", }), @@ -740,46 +970,213 @@ func TestToComponents(t *testing.T) { }, }, { - Name: "Complex representation", + Name: "Debug log level (isolated units)", Platform: linuxAMD64Platform, + LogLevel: logp.DebugLevel, Policy: map[string]interface{}{ "outputs": map[string]interface{}{ "default": map[string]interface{}{ - "type": "elasticsearch", - }, - "other": map[string]interface{}{ - "type": "elasticsearch", - }, - "stashit": map[string]interface{}{ - "type": "logstash", - }, - "redis": map[string]interface{}{ - "type": "redis", + "type": "elasticsearch", + "enabled": true, }, }, "inputs": []interface{}{ map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", - }, - map[string]interface{}{ - "type": "filestream", - "id": "filestream-1", + "type": "cloudbeat", + "id": "cloudbeat-0", + "enabled": true, }, map[string]interface{}{ - "type": "filestream", - "id": "filestream-2", + "type": "cloudbeat", + "id": "cloudbeat-1", "enabled": false, }, - map[string]interface{}{ - "type": "filestream", - "id": "filestream-3", - "use_output": "other", + }, + }, + Result: []Component{ + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, - map[string]interface{}{ - "type": "filestream", - "id": "filestream-4", - "use_output": "other", + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-0", + Type: client.UnitTypeOutput, + LogLevel: client.UnitLogLevelDebug, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-0-unit", + Type: client.UnitTypeInput, + LogLevel: client.UnitLogLevelDebug, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + }), + }, + }, + }, + }, + }, + { + Name: "Unique log level", + Platform: linuxAMD64Platform, + LogLevel: logp.ErrorLevel, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + "log_level": "debug", + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-1", + "enabled": false, + }, + }, + }, + Result: []Component{ + { + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + LogLevel: client.UnitLogLevelError, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "filestream-default-filestream-0", + Type: client.UnitTypeInput, + LogLevel: client.UnitLogLevelDebug, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }), + }, + }, + }, + }, + }, + { + Name: "Unique log level (isolated units)", + Platform: linuxAMD64Platform, + LogLevel: logp.ErrorLevel, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "enabled": true, + "log_level": "debug", + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-1", + "enabled": false, + }, + }, + }, + Result: []Component{ + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-0", + Type: client.UnitTypeOutput, + LogLevel: client.UnitLogLevelError, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-0-unit", + Type: client.UnitTypeInput, + LogLevel: client.UnitLogLevelDebug, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + }), + }, + }, + }, + }, + }, + { + Name: "Complex representation", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + }, + "other": map[string]interface{}{ + "type": "elasticsearch", + }, + "stashit": map[string]interface{}{ + "type": "logstash", + }, + "redis": map[string]interface{}{ + "type": "redis", + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-1", + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-2", + "enabled": false, + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-3", + "use_output": "other", + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-4", + "use_output": "other", }, map[string]interface{}{ "type": "logfile", @@ -1039,77 +1436,118 @@ func TestToComponents(t *testing.T) { }, }, { - Name: "Simple w/ shipper", + Name: "Complex representation (isolated units)", Platform: linuxAMD64Platform, Policy: map[string]interface{}{ "outputs": map[string]interface{}{ "default": map[string]interface{}{ - "type": "elasticsearch", - "enabled": true, - "shipper": map[string]interface{}{ - "enabled": true, - }, + "type": "elasticsearch", + }, + "other": map[string]interface{}{ + "type": "elasticsearch", + }, + "stashit": map[string]interface{}{ + "type": "logstash", + }, + "redis": map[string]interface{}{ + "type": "redis", }, }, "inputs": []interface{}{ map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", - "enabled": true, + "type": "cloudbeat", + "id": "cloudbeat-0", }, map[string]interface{}{ - "type": "filestream", - "id": "filestream-1", + "type": "cloudbeat", + "id": "cloudbeat-1", + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-2", "enabled": false, }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-3", + "use_output": "other", + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-4", + "use_output": "other", + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-5", + "use_output": "default", + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-6", + "use_output": "default", + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-7", + "use_output": "other", + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-8", + "use_output": "stashit", + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-9", + "use_output": "redis", + }, + map[string]interface{}{ + "type": "apm", + "id": "apm-server-0", + }, }, }, Result: []Component{ { - ID: "filestream-default", - InputType: "filestream", + InputType: "cloudbeat", OutputType: "elasticsearch", InputSpec: &InputRuntimeSpec{ - InputType: "filestream", - BinaryName: "filebeat", - BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, Units: []Unit{ { - ID: "filestream-default", + ID: "cloudbeat-default-cloudbeat-0", Type: client.UnitTypeOutput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "type": "shipper", + "type": "elasticsearch", }), }, { - ID: "filestream-default-filestream-0", + ID: "cloudbeat-default-cloudbeat-0-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", + "type": "cloudbeat", + "id": "cloudbeat-0", }), }, }, - ShipperRef: &ShipperReference{ - ShipperType: "shipper", - ComponentID: "shipper-default", - UnitID: "filestream-default", - }, }, { - ID: "shipper-default", + InputType: "cloudbeat", OutputType: "elasticsearch", - ShipperSpec: &ShipperRuntimeSpec{ - ShipperType: "shipper", - BinaryName: "shipper", - BinaryPath: filepath.Join("..", "..", "specs", "shipper"), + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, Units: []Unit{ { - ID: "shipper-default", + ID: "cloudbeat-default-cloudbeat-1", Type: client.UnitTypeOutput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ @@ -1117,37 +1555,933 @@ func TestToComponents(t *testing.T) { }), }, { - ID: "filestream-default", + ID: "cloudbeat-default-cloudbeat-1-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "id": "filestream-default", - "type": "shipper", - "units": []interface{}{ - map[string]interface{}{ - "id": "filestream-default-filestream-0", - "config": map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", - }, - }, - }, + "type": "cloudbeat", + "id": "cloudbeat-1", }), }, }, }, - }, - }, - { - Name: "Complex w/ shipper", - Platform: linuxAMD64Platform, - Policy: map[string]interface{}{ - "outputs": map[string]interface{}{ - "default": map[string]interface{}{ - "type": "elasticsearch", - "shipper": map[string]interface{}{ - "enabled": true, - }, + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-other-cloudbeat-3", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-other-cloudbeat-3-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-3", + }), + }, + }, + }, + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-other-cloudbeat-4", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-other-cloudbeat-4-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-4", + }), + }, + }, + }, + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-5", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-5-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: mustExpectedConfigForceType(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-5", + }, "cloudbeat"), + }, + }, + }, + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-6", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-6-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-6", + }), + }, + }, + }, + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-other-cloudbeat-7", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-other-cloudbeat-7-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: mustExpectedConfigForceType(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-7", + }, "cloudbeat"), + }, + }, + }, + { + InputType: "cloudbeat", + OutputType: "logstash", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-stashit-cloudbeat-8", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "logstash", + }), + }, + { + ID: "cloudbeat-stashit-cloudbeat-8-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: mustExpectedConfigForceType(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-8", + }, "cloudbeat"), + }, + }, + }, + { + InputType: "cloudbeat", + OutputType: "redis", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-redis-cloudbeat-9", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "redis", + }), + }, + { + ID: "cloudbeat-redis-cloudbeat-9-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: mustExpectedConfigForceType(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-9", + }, "cloudbeat"), + }, + }, + }, + { + InputType: "apm", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "apm", + BinaryName: "apm-server", + BinaryPath: filepath.Join("..", "..", "specs", "apm-server"), + }, + Units: []Unit{ + { + ID: "apm-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "apm-default-apm-server-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "apm", + "id": "apm-server-0", + }), + }, + }, + }, + }, + }, + { + Name: "Simple w/ shipper", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + "shipper": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-1", + "enabled": false, + }, + }, + }, + Result: []Component{ + { + ID: "filestream-default", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "shipper", + }), + }, + { + ID: "filestream-default-filestream-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }), + }, + }, + ShipperRef: &ShipperReference{ + ShipperType: "shipper", + ComponentID: "shipper-default", + UnitID: "filestream-default", + }, + }, + { + ID: "shipper-default", + OutputType: "elasticsearch", + ShipperSpec: &ShipperRuntimeSpec{ + ShipperType: "shipper", + BinaryName: "shipper", + BinaryPath: filepath.Join("..", "..", "specs", "shipper"), + }, + Units: []Unit{ + { + ID: "shipper-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "filestream-default", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "id": "filestream-default", + "type": "shipper", + "units": []interface{}{ + map[string]interface{}{ + "id": "filestream-default-filestream-0", + "config": map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }, + }, + }, + }), + }, + }, + }, + }, + }, + { + Name: "Simple w/ shipper (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + "shipper": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "enabled": true, + }, + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-1", + "enabled": false, + }, + }, + }, + Result: []Component{ + { + ID: "cloudbeat-default", + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-0", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "shipper", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-0-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + }), + }, + }, + ShipperRef: &ShipperReference{ + ShipperType: "shipper", + ComponentID: "shipper-default", + UnitID: "cloudbeat-default-cloudbeat-0", + }, + }, + { + ID: "shipper-default", + OutputType: "elasticsearch", + ShipperSpec: &ShipperRuntimeSpec{ + ShipperType: "shipper", + BinaryName: "shipper", + BinaryPath: filepath.Join("..", "..", "specs", "shipper"), + }, + Units: []Unit{ + { + ID: "shipper-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "id": "cloudbeat-default-cloudbeat-0", + "type": "shipper", + "units": []interface{}{ + map[string]interface{}{ + "id": "cloudbeat-default-cloudbeat-0-unit", + "config": map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + }, + }, + }, + }), + }, + }, + }, + }, + }, + { + Name: "Complex w/ shipper", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "shipper": map[string]interface{}{ + "enabled": true, + }, + }, + "other": map[string]interface{}{ + "type": "elasticsearch", + "shipper": map[string]interface{}{ + "enabled": false, + }, + }, + "stashit": map[string]interface{}{ + "type": "logstash", + "shipper": map[string]interface{}{ + "enabled": true, + }, + }, + "redis": map[string]interface{}{ + "type": "redis", + "shipper": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-1", + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-2", + "enabled": false, + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-3", + "use_output": "other", + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-4", + "use_output": "other", + }, + map[string]interface{}{ + "type": "logfile", + "id": "logfile-0", + "use_output": "default", + }, + map[string]interface{}{ + "type": "log", + "id": "logfile-1", + "use_output": "default", + }, + map[string]interface{}{ + "type": "logfile", + "id": "logfile-2", + "use_output": "other", + }, + map[string]interface{}{ + "type": "logfile", + "id": "logfile-3", + "use_output": "stashit", + }, + map[string]interface{}{ + "type": "logfile", + "id": "logfile-4", + "use_output": "redis", + }, + map[string]interface{}{ + "type": "apm", + "id": "apm-server-0", + }, + }, + }, + Result: []Component{ + { + ID: "filestream-default", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "shipper", + }), + }, + { + ID: "filestream-default-filestream-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }), + }, + { + ID: "filestream-default-filestream-1", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-1", + }), + }, + }, + ShipperRef: &ShipperReference{ + ShipperType: "shipper", + ComponentID: "shipper-default", + UnitID: "filestream-default", + }, + }, + { + ID: "filestream-other", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "filestream-other", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "filestream-other-filestream-3", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-3", + }), + }, + { + ID: "filestream-other-filestream-4", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-4", + }), + }, + }, + }, + { + ID: "log-default", + InputType: "log", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "log", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "log-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "shipper", + }), + }, + { + ID: "log-default-logfile-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: mustExpectedConfigForceType(map[string]interface{}{ + "type": "log", + "id": "logfile-0", + }, "log"), + }, + { + ID: "log-default-logfile-1", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "log", + "id": "logfile-1", + }), + }, + }, + ShipperRef: &ShipperReference{ + ShipperType: "shipper", + ComponentID: "shipper-default", + UnitID: "log-default", + }, + }, + { + ID: "shipper-default", + OutputType: "elasticsearch", + ShipperSpec: &ShipperRuntimeSpec{ + ShipperType: "shipper", + BinaryName: "shipper", + BinaryPath: filepath.Join("..", "..", "specs", "shipper"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "id": "filestream-default", + "type": "shipper", + "units": []interface{}{ + map[string]interface{}{ + "id": "filestream-default-filestream-0", + "config": map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }, + }, + map[string]interface{}{ + "id": "filestream-default-filestream-1", + "config": map[string]interface{}{ + "type": "filestream", + "id": "filestream-1", + }, + }, + }, + }), + }, + { + ID: "log-default", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "id": "log-default", + "type": "shipper", + "units": []interface{}{ + map[string]interface{}{ + "id": "log-default-logfile-0", + "config": map[string]interface{}{ + "type": "log", + "id": "logfile-0", + }, + }, + map[string]interface{}{ + "id": "log-default-logfile-1", + "config": map[string]interface{}{ + "type": "log", + "id": "logfile-1", + }, + }, + }, + }), + }, + { + ID: "shipper-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + }, + }, + { + ID: "log-other", + InputType: "log", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "log", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "log-other", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "log-other-logfile-2", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: mustExpectedConfigForceType(map[string]interface{}{ + "type": "log", + "id": "logfile-2", + }, "log"), + }, + }, + }, + { + ID: "log-stashit", + InputType: "log", + OutputType: "logstash", + InputSpec: &InputRuntimeSpec{ + InputType: "log", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "log-stashit", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "shipper", + }), + }, + { + ID: "log-stashit-logfile-3", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: mustExpectedConfigForceType(map[string]interface{}{ + "type": "log", + "id": "logfile-3", + }, "log"), + }, + }, + ShipperRef: &ShipperReference{ + ShipperType: "shipper", + ComponentID: "shipper-stashit", + UnitID: "log-stashit", + }, + }, + { + ID: "shipper-stashit", + OutputType: "logstash", + ShipperSpec: &ShipperRuntimeSpec{ + ShipperType: "shipper", + BinaryName: "shipper", + BinaryPath: filepath.Join("..", "..", "specs", "shipper"), + }, + Units: []Unit{ + { + ID: "log-stashit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "id": "log-stashit", + "type": "shipper", + "units": []interface{}{ + map[string]interface{}{ + "id": "log-stashit-logfile-3", + "config": map[string]interface{}{ + "type": "log", + "id": "logfile-3", + }, + }, + }, + }), + }, + { + ID: "shipper-stashit", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "logstash", + }), + }, + }, + }, + { + ID: "log-redis", + InputType: "log", + OutputType: "redis", + InputSpec: &InputRuntimeSpec{ + InputType: "log", + BinaryName: "filebeat", + BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + }, + Units: []Unit{ + { + ID: "log-redis", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "shipper", + }), + }, + { + ID: "log-redis-logfile-4", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: mustExpectedConfigForceType(map[string]interface{}{ + "type": "log", + "id": "logfile-4", + }, "log"), + }, + }, + ShipperRef: &ShipperReference{ + ShipperType: "shipper", + ComponentID: "shipper-redis", + UnitID: "log-redis", + }, + }, + { + ID: "shipper-redis", + OutputType: "redis", + ShipperSpec: &ShipperRuntimeSpec{ + ShipperType: "shipper", + BinaryName: "shipper", + BinaryPath: filepath.Join("..", "..", "specs", "shipper"), + }, + Units: []Unit{ + { + ID: "log-redis", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "id": "log-redis", + "type": "shipper", + "units": []interface{}{ + map[string]interface{}{ + "id": "log-redis-logfile-4", + "config": map[string]interface{}{ + "type": "log", + "id": "logfile-4", + }, + }, + }, + }), + }, + { + ID: "shipper-redis", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "redis", + }), + }, + }, + }, + { + ID: "apm-default", + InputType: "apm", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "apm", + BinaryName: "apm-server", + BinaryPath: filepath.Join("..", "..", "specs", "apm-server"), + }, + Units: []Unit{ + { + ID: "apm-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "apm-default-apm-server-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "apm", + "id": "apm-server-0", + }), + }, + }, + }, + }, + }, + { + Name: "Complex w/ shipper (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "shipper": map[string]interface{}{ + "enabled": true, + }, }, "other": map[string]interface{}{ "type": "elasticsearch", @@ -1170,51 +2504,51 @@ func TestToComponents(t *testing.T) { }, "inputs": []interface{}{ map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", + "type": "cloudbeat", + "id": "cloudbeat-0", }, map[string]interface{}{ - "type": "filestream", - "id": "filestream-1", + "type": "cloudbeat", + "id": "cloudbeat-1", }, map[string]interface{}{ - "type": "filestream", - "id": "filestream-2", + "type": "cloudbeat", + "id": "cloudbeat-2", "enabled": false, }, map[string]interface{}{ - "type": "filestream", - "id": "filestream-3", + "type": "cloudbeat", + "id": "cloudbeat-3", "use_output": "other", }, map[string]interface{}{ - "type": "filestream", - "id": "filestream-4", + "type": "cloudbeat", + "id": "cloudbeat-4", "use_output": "other", }, map[string]interface{}{ - "type": "logfile", - "id": "logfile-0", + "type": "cloudbeat", + "id": "cloudbeat-5", "use_output": "default", }, map[string]interface{}{ - "type": "log", - "id": "logfile-1", + "type": "cloudbeat", + "id": "cloudbeat-6", "use_output": "default", }, map[string]interface{}{ - "type": "logfile", - "id": "logfile-2", + "type": "cloudbeat", + "id": "cloudbeat-7", "use_output": "other", }, map[string]interface{}{ - "type": "logfile", - "id": "logfile-3", + "type": "cloudbeat", + "id": "cloudbeat-8", "use_output": "stashit", }, map[string]interface{}{ - "type": "logfile", - "id": "logfile-4", + "type": "cloudbeat", + "id": "cloudbeat-9", "use_output": "redis", }, map[string]interface{}{ @@ -1225,17 +2559,17 @@ func TestToComponents(t *testing.T) { }, Result: []Component{ { - ID: "filestream-default", - InputType: "filestream", + ID: "cloudbeat-default-cloudbeat-0", + InputType: "cloudbeat", OutputType: "elasticsearch", InputSpec: &InputRuntimeSpec{ - InputType: "filestream", - BinaryName: "filebeat", - BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, Units: []Unit{ { - ID: "filestream-default", + ID: "cloudbeat-default-cloudbeat-0", Type: client.UnitTypeOutput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ @@ -1243,42 +2577,67 @@ func TestToComponents(t *testing.T) { }), }, { - ID: "filestream-default-filestream-0", + ID: "cloudbeat-default-cloudbeat-0-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", + "type": "cloudbeat", + "id": "cloudbeat-0", }), }, + }, + ShipperRef: &ShipperReference{ + ShipperType: "shipper", + ComponentID: "shipper-default", + UnitID: "cloudbeat-default-cloudbeat-0", + }, + }, + { + ID: "cloudbeat-default-cloudbeat-1", + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ { - ID: "filestream-default-filestream-1", + ID: "cloudbeat-default-cloudbeat-1", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "shipper", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-1-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "type": "filestream", - "id": "filestream-1", + "type": "cloudbeat", + "id": "cloudbeat-1", }), }, }, ShipperRef: &ShipperReference{ ShipperType: "shipper", ComponentID: "shipper-default", - UnitID: "filestream-default", + UnitID: "cloudbeat-default-cloudbeat-1", }, }, { - ID: "filestream-other", - InputType: "filestream", + ID: "cloudbeat-other-cloudbeat-3", + InputType: "cloudbeat", OutputType: "elasticsearch", InputSpec: &InputRuntimeSpec{ - InputType: "filestream", - BinaryName: "filebeat", - BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, Units: []Unit{ { - ID: "filestream-other", + ID: "cloudbeat-other-cloudbeat-3", Type: client.UnitTypeOutput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ @@ -1286,37 +2645,57 @@ func TestToComponents(t *testing.T) { }), }, { - ID: "filestream-other-filestream-3", + ID: "cloudbeat-other-cloudbeat-3-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "type": "filestream", - "id": "filestream-3", + "type": "cloudbeat", + "id": "cloudbeat-3", }), }, + }, + }, + { + ID: "cloudbeat-other-cloudbeat-4", + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ { - ID: "filestream-other-filestream-4", + ID: "cloudbeat-other-cloudbeat-4", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "cloudbeat-other-cloudbeat-4-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "type": "filestream", - "id": "filestream-4", + "type": "cloudbeat", + "id": "cloudbeat-4", }), }, }, }, { - ID: "log-default", - InputType: "log", + ID: "cloudbeat-default-cloudbeat-5", + InputType: "cloudbeat", OutputType: "elasticsearch", InputSpec: &InputRuntimeSpec{ - InputType: "log", - BinaryName: "filebeat", - BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, Units: []Unit{ { - ID: "log-default", + ID: "cloudbeat-default-cloudbeat-5", Type: client.UnitTypeOutput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ @@ -1324,28 +2703,53 @@ func TestToComponents(t *testing.T) { }), }, { - ID: "log-default-logfile-0", + ID: "cloudbeat-default-cloudbeat-5-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: mustExpectedConfigForceType(map[string]interface{}{ - "type": "log", - "id": "logfile-0", - }, "log"), + "type": "cloudbeat", + "id": "cloudbeat-5", + }, "cloudbeat"), }, + }, + ShipperRef: &ShipperReference{ + ShipperType: "shipper", + ComponentID: "shipper-default", + UnitID: "cloudbeat-default-cloudbeat-5", + }, + }, + { + ID: "cloudbeat-default-cloudbeat-6", + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ { - ID: "log-default-logfile-1", - Type: client.UnitTypeInput, + ID: "cloudbeat-default-cloudbeat-6", + Type: client.UnitTypeOutput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "type": "log", - "id": "logfile-1", + "type": "shipper", }), }, + { + ID: "cloudbeat-default-cloudbeat-6-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: mustExpectedConfigForceType(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-6", + }, "cloudbeat"), + }, }, ShipperRef: &ShipperReference{ ShipperType: "shipper", ComponentID: "shipper-default", - UnitID: "log-default", + UnitID: "cloudbeat-default-cloudbeat-6", }, }, { @@ -1358,50 +2762,72 @@ func TestToComponents(t *testing.T) { }, Units: []Unit{ { - ID: "filestream-default", + ID: "cloudbeat-default-cloudbeat-0", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "id": "filestream-default", + "id": "cloudbeat-default-cloudbeat-0", "type": "shipper", "units": []interface{}{ map[string]interface{}{ - "id": "filestream-default-filestream-0", + "id": "cloudbeat-default-cloudbeat-0-unit", "config": map[string]interface{}{ - "type": "filestream", - "id": "filestream-0", + "type": "cloudbeat", + "id": "cloudbeat-0", }, }, + }, + }), + }, + { + ID: "cloudbeat-default-cloudbeat-1", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "id": "cloudbeat-default-cloudbeat-1", + "type": "shipper", + "units": []interface{}{ map[string]interface{}{ - "id": "filestream-default-filestream-1", + "id": "cloudbeat-default-cloudbeat-1-unit", "config": map[string]interface{}{ - "type": "filestream", - "id": "filestream-1", + "type": "cloudbeat", + "id": "cloudbeat-1", }, }, }, }), }, { - ID: "log-default", + ID: "cloudbeat-default-cloudbeat-5", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "id": "log-default", + "id": "cloudbeat-default-cloudbeat-5", "type": "shipper", "units": []interface{}{ map[string]interface{}{ - "id": "log-default-logfile-0", + "id": "cloudbeat-default-cloudbeat-5-unit", "config": map[string]interface{}{ - "type": "log", - "id": "logfile-0", + "type": "cloudbeat", + "id": "cloudbeat-5", }, }, + }, + }), + }, + { + ID: "cloudbeat-default-cloudbeat-6", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "id": "cloudbeat-default-cloudbeat-6", + "type": "shipper", + "units": []interface{}{ map[string]interface{}{ - "id": "log-default-logfile-1", + "id": "cloudbeat-default-cloudbeat-6-unit", "config": map[string]interface{}{ - "type": "log", - "id": "logfile-1", + "type": "cloudbeat", + "id": "cloudbeat-6", }, }, }, @@ -1418,17 +2844,17 @@ func TestToComponents(t *testing.T) { }, }, { - ID: "log-other", - InputType: "log", + ID: "cloudbeat-other-cloudbeat-7", + InputType: "cloudbeat", OutputType: "elasticsearch", InputSpec: &InputRuntimeSpec{ - InputType: "log", - BinaryName: "filebeat", - BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, Units: []Unit{ { - ID: "log-other", + ID: "cloudbeat-other-cloudbeat-7", Type: client.UnitTypeOutput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ @@ -1436,28 +2862,28 @@ func TestToComponents(t *testing.T) { }), }, { - ID: "log-other-logfile-2", + ID: "cloudbeat-other-cloudbeat-7-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: mustExpectedConfigForceType(map[string]interface{}{ - "type": "log", - "id": "logfile-2", - }, "log"), + "type": "cloudbeat", + "id": "cloudbeat-7", + }, "cloudbeat"), }, }, }, { - ID: "log-stashit", - InputType: "log", + ID: "cloudbeat-stashit-cloudbeat-8", + InputType: "cloudbeat", OutputType: "logstash", InputSpec: &InputRuntimeSpec{ - InputType: "log", - BinaryName: "filebeat", - BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, Units: []Unit{ { - ID: "log-stashit", + ID: "cloudbeat-stashit-cloudbeat-8", Type: client.UnitTypeOutput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ @@ -1465,19 +2891,19 @@ func TestToComponents(t *testing.T) { }), }, { - ID: "log-stashit-logfile-3", + ID: "cloudbeat-stashit-cloudbeat-8-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: mustExpectedConfigForceType(map[string]interface{}{ - "type": "log", - "id": "logfile-3", - }, "log"), + "type": "cloudbeat", + "id": "cloudbeat-8", + }, "cloudbeat"), }, }, ShipperRef: &ShipperReference{ ShipperType: "shipper", ComponentID: "shipper-stashit", - UnitID: "log-stashit", + UnitID: "cloudbeat-stashit-cloudbeat-8", }, }, { @@ -1490,18 +2916,18 @@ func TestToComponents(t *testing.T) { }, Units: []Unit{ { - ID: "log-stashit", + ID: "cloudbeat-stashit-cloudbeat-8", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "id": "log-stashit", + "id": "cloudbeat-stashit-cloudbeat-8", "type": "shipper", "units": []interface{}{ map[string]interface{}{ - "id": "log-stashit-logfile-3", + "id": "cloudbeat-stashit-cloudbeat-8-unit", "config": map[string]interface{}{ - "type": "log", - "id": "logfile-3", + "type": "cloudbeat", + "id": "cloudbeat-8", }, }, }, @@ -1518,17 +2944,17 @@ func TestToComponents(t *testing.T) { }, }, { - ID: "log-redis", - InputType: "log", + ID: "cloudbeat-redis-cloudbeat-9", + InputType: "cloudbeat", OutputType: "redis", InputSpec: &InputRuntimeSpec{ - InputType: "log", - BinaryName: "filebeat", - BinaryPath: filepath.Join("..", "..", "specs", "filebeat"), + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), }, Units: []Unit{ { - ID: "log-redis", + ID: "cloudbeat-redis-cloudbeat-9", Type: client.UnitTypeOutput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ @@ -1536,19 +2962,19 @@ func TestToComponents(t *testing.T) { }), }, { - ID: "log-redis-logfile-4", + ID: "cloudbeat-redis-cloudbeat-9-unit", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: mustExpectedConfigForceType(map[string]interface{}{ - "type": "log", - "id": "logfile-4", - }, "log"), + "type": "cloudbeat", + "id": "cloudbeat-9", + }, "cloudbeat"), }, }, ShipperRef: &ShipperReference{ ShipperType: "shipper", ComponentID: "shipper-redis", - UnitID: "log-redis", + UnitID: "cloudbeat-redis-cloudbeat-9", }, }, { @@ -1561,18 +2987,18 @@ func TestToComponents(t *testing.T) { }, Units: []Unit{ { - ID: "log-redis", + ID: "cloudbeat-redis-cloudbeat-9", Type: client.UnitTypeInput, LogLevel: defaultUnitLogLevel, Config: MustExpectedConfig(map[string]interface{}{ - "id": "log-redis", + "id": "cloudbeat-redis-cloudbeat-9", "type": "shipper", "units": []interface{}{ map[string]interface{}{ - "id": "log-redis-logfile-4", + "id": "cloudbeat-redis-cloudbeat-9-unit", "config": map[string]interface{}{ - "type": "log", - "id": "logfile-4", + "type": "cloudbeat", + "id": "cloudbeat-9", }, }, }, @@ -1737,6 +3163,61 @@ func TestToComponents(t *testing.T) { "header-one": "val-1", }}, }, + { + Name: "Headers injection (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "enabled": true, + }, + }, + }, + Result: []Component{ + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-0", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + "headers": map[string]interface{}{ + "cloud": "beat", + }, + }), + }, + { + ID: "cloudbeat-default-cloudbeat-0-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + }), + }, + }, + }, + }, + headers: &testHeadersProvider{headers: map[string]string{ + "cloud": "beat", + }}, + }, { Name: "Headers injection merge", Platform: linuxAMD64Platform, @@ -1796,6 +3277,65 @@ func TestToComponents(t *testing.T) { "header-one": "val-1", }}, }, + { + Name: "Headers injection merge (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + "headers": map[string]interface{}{ + "cloud1": "beat1", + }, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "enabled": true, + }, + }, + }, + Result: []Component{ + { + InputType: "cloudbeat", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-0", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + "headers": map[string]interface{}{ + "cloud1": "beat1", + "cloud2": "beat2", + }, + }), + }, + { + ID: "cloudbeat-default-cloudbeat-0-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + }), + }, + }, + }, + }, + headers: &testHeadersProvider{headers: map[string]string{ + "cloud2": "beat2", + }}, + }, { Name: "Headers injection not injecting kafka", Platform: linuxAMD64Platform, @@ -1848,6 +3388,58 @@ func TestToComponents(t *testing.T) { "header-one": "val-1", }}, }, + { + Name: "Headers injection not injecting kafka (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "kafka", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "enabled": true, + }, + }, + }, + Result: []Component{ + { + InputType: "cloudbeat", + OutputType: "kafka", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-0", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "kafka", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-0-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + }), + }, + }, + }, + }, + headers: &testHeadersProvider{headers: map[string]string{ + "cloud": "beat", + }}, + }, { Name: "Headers injection not injecting logstash", Platform: linuxAMD64Platform, @@ -1900,6 +3492,58 @@ func TestToComponents(t *testing.T) { "header-one": "val-1", }}, }, + { + Name: "Headers injection not injecting logstash (isolated units)", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "logstash", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + "enabled": true, + }, + }, + }, + Result: []Component{ + { + InputType: "cloudbeat", + OutputType: "logstash", + InputSpec: &InputRuntimeSpec{ + InputType: "cloudbeat", + BinaryName: "cloudbeat", + BinaryPath: filepath.Join("..", "..", "specs", "cloudbeat"), + }, + Units: []Unit{ + { + ID: "cloudbeat-default-cloudbeat-0", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "logstash", + }), + }, + { + ID: "cloudbeat-default-cloudbeat-0-unit", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "cloudbeat", + "id": "cloudbeat-0", + }), + }, + }, + }, + }, + headers: &testHeadersProvider{headers: map[string]string{ + "cloud": "beat", + }}, + }, } for _, scenario := range scenarios { @@ -2458,3 +4102,108 @@ func TestFlattenedDataStream(t *testing.T) { t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace) } } + +func TestFlattenedDataStreamIsolatedUnits(t *testing.T) { + id0 := "cloudbeat-0" + id1 := "cloudbeat-1" + expectedNamespace := map[string]string{ + id0: "test-namespace-0", + id1: "test-namespace-1", + } + expectedType := map[string]string{ + id0: "test-type-0", + id1: "test-type-1", + } + expectedDataset := map[string]string{ + id0: "test-dataset-0", + id1: "test-dataset-1", + } + + policy := map[string]any{ + "outputs": map[string]any{ + "default": map[string]any{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []any{ + map[string]any{ + "type": "cloudbeat", + "id": id0, + "enabled": true, + "data_stream.type": expectedType[id0], + "data_stream.dataset": expectedDataset[id0], + "data_stream": map[string]any{ + "namespace": expectedNamespace[id0], + }, + }, + map[string]any{ + "type": "cloudbeat", + "id": id1, + "enabled": true, + "data_stream.type": expectedType[id1], + "data_stream.dataset": expectedDataset[id1], + "data_stream": map[string]any{ + "namespace": expectedNamespace[id1], + }, + }, + }, + } + + linuxAMD64Platform := PlatformDetail{ + Platform: Platform{ + OS: Linux, + Arch: AMD64, + GOOS: Linux, + }, + } + + runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), linuxAMD64Platform, SkipBinaryCheck()) + if err != nil { + t.Fatalf("cannot load runtime specs: %s", err) + } + + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + if err != nil { + t.Fatalf("cannot convert policy to component: %s", err) + } + + if len(result) != 2 { + t.Fatalf("expecting result to have one element, got %d", len(result)) + } + + for _, component := range result { + if len(component.Units) != 2 { + t.Fatalf("expecting component.Units to have two elements, got %d", len(component.Units)) + } + + // We do not make assumptions about ordering. + // Get the input Unit + var dataStream *proto.DataStream + for _, unit := range component.Units { + if unit.Err != nil { + t.Fatalf("unit.Err: %s", unit.Err) + } + if unit.Type == client.UnitTypeInput { + dataStream = unit.Config.DataStream + break + } + } + + if dataStream == nil { + t.Fatal("DataStream cannot be nil") + } + + currentId := component.ID[len(component.ID)-len(id0):] + + if dataStream.Dataset != expectedDataset[currentId] { + t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset[currentId], dataStream.Dataset) + } + if dataStream.Type != expectedType[currentId] { + t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType[currentId], dataStream.Type) + } + if dataStream.Namespace != expectedNamespace[currentId] { + t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace[currentId], dataStream.Namespace) + } + } +} diff --git a/pkg/component/fake/component/comp/actions.go b/pkg/component/fake/component/comp/actions.go index 9b4dc74c0b2..8d211653044 100644 --- a/pkg/component/fake/component/comp/actions.go +++ b/pkg/component/fake/component/comp/actions.go @@ -133,7 +133,7 @@ func newRunningUnit(logger zerolog.Logger, manager *StateManager, unit *client.U expected.Config.Type) } switch expected.Config.Type { - case Fake: + case Fake, FakeIsolatedUnits: return newFakeInput(logger, expected.LogLevel, manager, unit, expected.Config) case APM: return newFakeAPMInput(logger, expected.LogLevel, unit) diff --git a/pkg/component/fake/component/comp/component.go b/pkg/component/fake/component/comp/component.go index 837689f912f..dac43114f74 100644 --- a/pkg/component/fake/component/comp/component.go +++ b/pkg/component/fake/component/comp/component.go @@ -27,9 +27,10 @@ import ( ) const ( - Fake = "fake" - fakeShipper = "fake-shipper" - APM = "fake-apm" + Fake = "fake" + FakeIsolatedUnits = "fake-isolated-units" + fakeShipper = "fake-shipper" + APM = "fake-apm" configuringMsg = "Configuring" stoppingMsg = "Stopping" @@ -386,7 +387,7 @@ func (f *fakeInput) Update(u *client.Unit, triggers client.Trigger) error { if expected.Config.Type == "" { return fmt.Errorf("unit missing config type") } - if expected.Config.Type != Fake { + if expected.Config.Type != Fake && expected.Config.Type != FakeIsolatedUnits { return fmt.Errorf("unit type changed with the same unit ID: %s", expected.Config.Type) } diff --git a/pkg/component/fake/shipper/main.go b/pkg/component/fake/shipper/main.go index 5dcd440ecc6..0ee90f9717f 100644 --- a/pkg/component/fake/shipper/main.go +++ b/pkg/component/fake/shipper/main.go @@ -106,14 +106,50 @@ type unitKey struct { unitID string } +type CachedServer struct { + cfg common.FakeShipperConfig + srv *grpc.Server + wg errgroup.Group + ref int +} + +// inc increments the reference count of the server +// it is used to keep the server running as long as there are units using it +// the server is stopped when the reference count reaches 0 +// this is protected by the unitsMx mutex +func (c *CachedServer) incRef() { + c.ref++ +} + +// dec decrements the reference count of the server and stops it if the reference count reaches 0 +// it returns true if the server was stopped and should be removed from the cache or false if the server is still in use +// this is protected by the unitsMx mutex +func (c *CachedServer) decRef() bool { + c.ref-- + if c.ref == 0 { + // safeguard, should not happen + if c.srv != nil { + c.srv.Stop() + // left the previous implementation as is, without the error handling + _ = c.wg.Wait() + } + // remove the server from the cache + return true + } + // server is still in use + return false +} + type stateManager struct { logger zerolog.Logger unitsMx sync.RWMutex units map[unitKey]runningUnit + // Protected from concurrent access by unitsMx mutex + serverCache map[string]*CachedServer } func newStateManager(logger zerolog.Logger) *stateManager { - return &stateManager{logger: logger, units: make(map[unitKey]runningUnit)} + return &stateManager{logger: logger, units: make(map[unitKey]runningUnit), serverCache: make(map[string]*CachedServer)} } func (s *stateManager) added(unit *client.Unit) { @@ -329,8 +365,7 @@ type fakeShipperInput struct { unit *client.Unit cfg *proto.UnitExpectedConfig - srv *grpc.Server - wg errgroup.Group + srv string } func newFakeShipperInput(logger zerolog.Logger, logLevel client.UnitLogLevel, manager *stateManager, unit *client.Unit, cfg *proto.UnitExpectedConfig) (*fakeShipperInput, error) { @@ -348,24 +383,36 @@ func newFakeShipperInput(logger zerolog.Logger, logLevel client.UnitLogLevel, ma return nil, err } - logger.Info().Str("server", srvCfg.Server).Msg("starting GRPC fake shipper server") - lis, err := createListener(srvCfg.Server) - if err != nil { - return nil, err - } - if srvCfg.TLS == nil || srvCfg.TLS.Cert == "" || srvCfg.TLS.Key == "" { - return nil, fmt.Errorf("ssl configuration missing") - } - cert, err := tls.X509KeyPair([]byte(srvCfg.TLS.Cert), []byte(srvCfg.TLS.Key)) - if err != nil { - return nil, err + // This access if protected by the unitsMx + cachedSrv, ok := manager.serverCache[srvCfg.Server] + if ok { + // Assuming that the server is already running and units are using the same TLS configuration + logger.Info().Str("server", srvCfg.Server).Msg("using existing GRPC fake shipper server") + cachedSrv.incRef() + i.srv = srvCfg.Server + } else { + logger.Info().Str("server", srvCfg.Server).Msg("starting GRPC fake shipper server") + lis, err := createListener(srvCfg.Server) + if err != nil { + return nil, err + } + if srvCfg.TLS == nil || srvCfg.TLS.Cert == "" || srvCfg.TLS.Key == "" { + return nil, fmt.Errorf("ssl configuration missing") + } + cert, err := tls.X509KeyPair([]byte(srvCfg.TLS.Cert), []byte(srvCfg.TLS.Key)) + if err != nil { + return nil, err + } + srv := grpc.NewServer(grpc.Creds(credentials.NewServerTLSFromCert(&cert))) + i.srv = srvCfg.Server + common.RegisterFakeEventProtocolServer(srv, i) + cSrv := &CachedServer{cfg: srvCfg, srv: srv, ref: 1} + // Launch the server in a goroutine + cSrv.wg.Go(func() error { + return srv.Serve(lis) + }) + manager.serverCache[srvCfg.Server] = cSrv } - srv := grpc.NewServer(grpc.Creds(credentials.NewServerTLSFromCert(&cert))) - i.srv = srv - common.RegisterFakeEventProtocolServer(srv, i) - i.wg.Go(func() error { - return srv.Serve(lis) - }) logger.Trace().Msg("registering kill action for unit") unit.RegisterAction(&killAction{logger}) @@ -394,10 +441,9 @@ func (f *fakeShipperInput) Update(u *client.Unit) error { _ = u.UpdateState(client.UnitStateStopping, stoppingMsg, nil) go func() { - if f.srv != nil { - f.srv.Stop() - _ = f.wg.Wait() - f.srv = nil + cSrv, ok := f.manager.serverCache[f.srv] + if ok && cSrv.decRef() { + delete(f.manager.serverCache, f.srv) } f.logger.Debug(). Str("state", client.UnitStateStopped.String()). diff --git a/pkg/component/input_spec.go b/pkg/component/input_spec.go index 1f19d53a5de..f5ec8f76250 100644 --- a/pkg/component/input_spec.go +++ b/pkg/component/input_spec.go @@ -21,8 +21,9 @@ type InputSpec struct { Shippers []string `config:"shippers,omitempty" yaml:"shippers,omitempty"` Runtime RuntimeSpec `config:"runtime,omitempty" yaml:"runtime,omitempty"` - Command *CommandSpec `config:"command,omitempty" yaml:"command,omitempty"` - Service *ServiceSpec `config:"service,omitempty" yaml:"service,omitempty"` + Command *CommandSpec `config:"command,omitempty" yaml:"command,omitempty"` + Service *ServiceSpec `config:"service,omitempty" yaml:"service,omitempty"` + IsolateUnits bool `config:"isolate_units,omitempty" yaml:"isolate_units,omitempty"` } // Validate ensures correctness of input specification. diff --git a/specs/cloudbeat.spec.yml b/specs/cloudbeat.spec.yml index 966bd445ca9..99231ee623f 100644 --- a/specs/cloudbeat.spec.yml +++ b/specs/cloudbeat.spec.yml @@ -7,7 +7,6 @@ inputs: - linux/arm64 - darwin/amd64 - darwin/arm64 - - windows/amd64 - container/amd64 - container/arm64 outputs: &outputs @@ -15,6 +14,9 @@ inputs: - kafka - logstash - redis + # Introduced for isolated units tests, not used by cloudbeat + shippers: &shippers + - shipper command: &command restart_monitoring_period: 5s maximum_restarts_per_period: 1 @@ -35,33 +37,46 @@ inputs: - "logging.to_stderr=true" - "-E" - "gc_percent=${CLOUDBEAT_GOGC:100}" + isolate_units: true - name: cloudbeat/cis_k8s description: "CIS Kubernetes monitoring" platforms: *platforms outputs: *outputs + shippers: *shippers command: *command + isolate_units: true - name: cloudbeat/cis_eks description: "CIS elastic Kubernetes monitoring" platforms: *platforms outputs: *outputs + shippers: *shippers command: *command + isolate_units: true - name: cloudbeat/cis_aws description: "CIS AWS monitoring" platforms: *platforms outputs: *outputs + shippers: *shippers command: *command + isolate_units: true - name: cloudbeat/cis_gcp description: "CIS GCP monitoring" platforms: *platforms outputs: *outputs + shippers: *shippers command: *command + isolate_units: true - name: cloudbeat/cis_azure description: "CIS AZURE monitoring" platforms: *platforms outputs: *outputs + shippers: *shippers command: *command + isolate_units: true - name: cloudbeat/vuln_mgmt_aws description: "AWS Vulnerabilities management" platforms: *platforms outputs: *outputs + shippers: *shippers command: *command + isolate_units: true diff --git a/testing/integration/diagnostics_test.go b/testing/integration/diagnostics_test.go index de10c1dc761..24381f67946 100644 --- a/testing/integration/diagnostics_test.go +++ b/testing/integration/diagnostics_test.go @@ -82,6 +82,45 @@ var componentSetup = map[string]integrationtest.ComponentState{ }, } +var isolatedUnitsComponentSetup = map[string]integrationtest.ComponentState{ + "fake-isolated-units-default-fake-isolated-units": { + State: integrationtest.NewClientState(client.Healthy), + Units: map[integrationtest.ComponentUnitKey]integrationtest.ComponentUnitState{ + integrationtest.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "fake-isolated-units-default-fake-isolated-units"}: { + State: integrationtest.NewClientState(client.Healthy), + }, + integrationtest.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-unit"}: { + State: integrationtest.NewClientState(client.Healthy), + }, + }, + }, + "fake-isolated-units-default-fake-isolated-units-1": { + State: integrationtest.NewClientState(client.Healthy), + Units: map[integrationtest.ComponentUnitKey]integrationtest.ComponentUnitState{ + integrationtest.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "fake-isolated-units-default-fake-isolated-units-1"}: { + State: integrationtest.NewClientState(client.Healthy), + }, + integrationtest.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-1-unit"}: { + State: integrationtest.NewClientState(client.Healthy), + }, + }, + }, + "fake-shipper-default": { + State: integrationtest.NewClientState(client.Healthy), + Units: map[integrationtest.ComponentUnitKey]integrationtest.ComponentUnitState{ + integrationtest.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "fake-shipper-default"}: { + State: integrationtest.NewClientState(client.Healthy), + }, + integrationtest.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units"}: { + State: integrationtest.NewClientState(client.Healthy), + }, + integrationtest.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-1"}: { + State: integrationtest.NewClientState(client.Healthy), + }, + }, + }, +} + type componentAndUnitNames struct { name string unitNames []string @@ -108,9 +147,35 @@ func TestDiagnosticsOptionalValues(t *testing.T) { Configure: simpleConfig2, AgentState: integrationtest.NewClientState(client.Healthy), Components: componentSetup, - After: testDiagnosticsFactory(t, diagpprof, diagCompPprof, fixture, []string{"diagnostics", "-p"}), + After: testDiagnosticsFactory(t, componentSetup, diagpprof, diagCompPprof, fixture, []string{"diagnostics", "-p"}), }) + require.NoError(t, err) +} + +func TestIsolatedUnitsDiagnosticsOptionalValues(t *testing.T) { + define.Require(t, define.Requirements{ + Group: Default, + Local: false, + }) + + fixture, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(10*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx, fakeComponent, fakeShipper) + require.NoError(t, err) + + diagpprof := append(diagnosticsFiles, "cpu.pprof") + diagCompPprof := append(compDiagnosticsFiles, "cpu.pprof") + + err = fixture.Run(ctx, integrationtest.State{ + Configure: complexIsolatedUnitsConfig, + AgentState: integrationtest.NewClientState(client.Healthy), + Components: isolatedUnitsComponentSetup, + After: testDiagnosticsFactory(t, isolatedUnitsComponentSetup, diagpprof, diagCompPprof, fixture, []string{"diagnostics", "-p"}), + }) + require.NoError(t, err) } func TestDiagnosticsCommand(t *testing.T) { @@ -131,12 +196,35 @@ func TestDiagnosticsCommand(t *testing.T) { Configure: simpleConfig2, AgentState: integrationtest.NewClientState(client.Healthy), Components: componentSetup, - After: testDiagnosticsFactory(t, diagnosticsFiles, compDiagnosticsFiles, f, []string{"diagnostics", "collect"}), + After: testDiagnosticsFactory(t, componentSetup, diagnosticsFiles, compDiagnosticsFiles, f, []string{"diagnostics", "collect"}), }) assert.NoError(t, err) } -func testDiagnosticsFactory(t *testing.T, diagFiles []string, diagCompFiles []string, fix *integrationtest.Fixture, cmd []string) func(ctx context.Context) error { +func TestIsolatedUnitsDiagnosticsCommand(t *testing.T) { + define.Require(t, define.Requirements{ + Group: Default, + Local: false, + }) + + f, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(10*time.Minute)) + defer cancel() + err = f.Prepare(ctx, fakeComponent, fakeShipper) + require.NoError(t, err) + + err = f.Run(ctx, integrationtest.State{ + Configure: complexIsolatedUnitsConfig, + AgentState: integrationtest.NewClientState(client.Healthy), + Components: isolatedUnitsComponentSetup, + After: testDiagnosticsFactory(t, isolatedUnitsComponentSetup, diagnosticsFiles, compDiagnosticsFiles, f, []string{"diagnostics", "collect"}), + }) + assert.NoError(t, err) +} + +func testDiagnosticsFactory(t *testing.T, compSetup map[string]integrationtest.ComponentState, diagFiles []string, diagCompFiles []string, fix *integrationtest.Fixture, cmd []string) func(ctx context.Context) error { return func(ctx context.Context) error { diagZip, err := fix.ExecDiagnostics(ctx, cmd...) @@ -144,13 +232,13 @@ func testDiagnosticsFactory(t *testing.T, diagFiles []string, diagCompFiles []st avi, err := getRunningAgentVersion(ctx, fix) require.NoError(t, err) - verifyDiagnosticArchive(t, diagZip, diagFiles, diagCompFiles, avi) + verifyDiagnosticArchive(t, compSetup, diagZip, diagFiles, diagCompFiles, avi) return nil } } -func verifyDiagnosticArchive(t *testing.T, diagArchive string, diagFiles []string, diagCompFiles []string, avi *client.Version) { +func verifyDiagnosticArchive(t *testing.T, compSetup map[string]integrationtest.ComponentState, diagArchive string, diagFiles []string, diagCompFiles []string, avi *client.Version) { // check that the archive is not an empty file stat, err := os.Stat(diagArchive) require.NoErrorf(t, err, "stat file %q failed", diagArchive) @@ -161,16 +249,8 @@ func verifyDiagnosticArchive(t *testing.T, diagArchive string, diagFiles []strin extractZipArchive(t, diagArchive, extractionDir) - expectedDiagArchiveFilePatterns := compileExpectedDiagnosticFilePatterns(avi, diagFiles, diagCompFiles, []componentAndUnitNames{ - { - name: "fake-default", - unitNames: []string{"fake-default", "fake"}, - }, - { - name: "fake-shipper-default", - unitNames: []string{"fake-shipper-default", "fake-default"}, - }, - }) + compAndUnitNames := extractComponentAndUnitNames(compSetup) + expectedDiagArchiveFilePatterns := compileExpectedDiagnosticFilePatterns(avi, diagFiles, diagCompFiles, compAndUnitNames) expectedExtractedFiles := map[string]struct{}{} for _, filePattern := range expectedDiagArchiveFilePatterns { @@ -205,6 +285,21 @@ func verifyDiagnosticArchive(t *testing.T, diagArchive string, diagFiles []strin assert.ElementsMatch(t, extractKeysFromMap(expectedExtractedFiles), extractKeysFromMap(actualExtractedDiagFiles)) } +func extractComponentAndUnitNames(compSetup map[string]integrationtest.ComponentState) []componentAndUnitNames { + comps := make([]componentAndUnitNames, 0, len(compSetup)) + for compName, compState := range compSetup { + unitNames := make([]string, 0, len(compState.Units)) + for unitKey := range compState.Units { + unitNames = append(unitNames, unitKey.UnitID) + } + comps = append(comps, componentAndUnitNames{ + name: compName, + unitNames: unitNames, + }) + } + return comps +} + func extractZipArchive(t *testing.T, zipFile string, dst string) { t.Helper() diff --git a/testing/integration/fake_test.go b/testing/integration/fake_test.go index dc9cf5ecaa6..4190eaf68e4 100644 --- a/testing/integration/fake_test.go +++ b/testing/integration/fake_test.go @@ -43,6 +43,34 @@ inputs: message: Healthy ` +var simpleIsolatedUnitsConfig = ` +outputs: + default: + type: fake-action-output + shipper.enabled: true +inputs: + - id: fake-isolated-units + type: fake-isolated-units + state: 1 + message: Configuring +` + +var complexIsolatedUnitsConfig = ` +outputs: + default: + type: fake-action-output + shipper.enabled: true +inputs: + - id: fake-isolated-units + type: fake-isolated-units + state: 2 + message: Healthy + - id: fake-isolated-units-1 + type: fake-isolated-units + state: 2 + message: Healthy +` + func TestFakeComponent(t *testing.T) { define.Require(t, define.Requirements{ Group: Default, @@ -103,3 +131,78 @@ func TestFakeComponent(t *testing.T) { }) require.NoError(t, err) } + +func TestFakeIsolatedUnitsComponent(t *testing.T) { + define.Require(t, define.Requirements{ + Group: Default, + Local: true, + }) + + f, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(10*time.Minute)) + defer cancel() + err = f.Prepare(ctx, fakeComponent, fakeShipper) + require.NoError(t, err) + + err = f.Run(ctx, atesting.State{ + Configure: simpleIsolatedUnitsConfig, + AgentState: atesting.NewClientState(client.Healthy), + Components: map[string]atesting.ComponentState{ + "fake-isolated-units-default-fake-isolated-units": { + State: atesting.NewClientState(client.Healthy), + Units: map[atesting.ComponentUnitKey]atesting.ComponentUnitState{ + atesting.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "fake-isolated-units-default-fake-isolated-units"}: { + State: atesting.NewClientState(client.Healthy), + }, + atesting.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-unit"}: { + State: atesting.NewClientState(client.Configuring), + }, + }, + }, + }, + }, atesting.State{ + Configure: complexIsolatedUnitsConfig, + AgentState: atesting.NewClientState(client.Healthy), + Components: map[string]atesting.ComponentState{ + "fake-isolated-units-default-fake-isolated-units": { + State: atesting.NewClientState(client.Healthy), + Units: map[atesting.ComponentUnitKey]atesting.ComponentUnitState{ + atesting.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "fake-isolated-units-default-fake-isolated-units"}: { + State: atesting.NewClientState(client.Healthy), + }, + atesting.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-unit"}: { + State: atesting.NewClientState(client.Healthy), + }, + }, + }, + "fake-isolated-units-default-fake-isolated-units-1": { + State: atesting.NewClientState(client.Healthy), + Units: map[atesting.ComponentUnitKey]atesting.ComponentUnitState{ + atesting.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "fake-isolated-units-default-fake-isolated-units-1"}: { + State: atesting.NewClientState(client.Healthy), + }, + atesting.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-1-unit"}: { + State: atesting.NewClientState(client.Healthy), + }, + }, + }, + "fake-shipper-default": { + State: atesting.NewClientState(client.Healthy), + Units: map[atesting.ComponentUnitKey]atesting.ComponentUnitState{ + atesting.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "fake-shipper-default"}: { + State: atesting.NewClientState(client.Healthy), + }, + atesting.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units"}: { + State: atesting.NewClientState(client.Healthy), + }, + atesting.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-isolated-units-default-fake-isolated-units-1"}: { + State: atesting.NewClientState(client.Healthy), + }, + }, + }, + }, + }) + require.NoError(t, err) +} diff --git a/testing/integration/fakes.go b/testing/integration/fakes.go index 2364b06580d..15977e2e2d4 100644 --- a/testing/integration/fakes.go +++ b/testing/integration/fakes.go @@ -61,6 +61,16 @@ var fakeComponent = atesting.UsableComponent{ }, }, }, + { + Name: "fake-isolated-units", + Description: "A fake isolated units input", + Platforms: fakeComponentPltfs, + Shippers: []string{ + fakeShipperName, + }, + Command: &component.CommandSpec{}, + IsolateUnits: true, + }, }, }, } diff --git a/testing/integration/package_version_test.go b/testing/integration/package_version_test.go index 932866ca364..f9d8f134e0c 100644 --- a/testing/integration/package_version_test.go +++ b/testing/integration/package_version_test.go @@ -160,8 +160,13 @@ func TestComponentBuildHashInDiagnostics(t *testing.T) { diag := t.TempDir() extractZipArchive(t, diagZip, diag) - stateYAML, err := os.Open(filepath.Join(diag, "state.yaml")) + stateFilePath := filepath.Join(diag, "state.yaml") + stateYAML, err := os.Open(stateFilePath) require.NoError(t, err, "could not open diagnostics state.yaml") + defer func(stateYAML *os.File) { + err := stateYAML.Close() + assert.NoErrorf(t, err, "error closing %q", stateFilePath) + }(stateYAML) state := struct { Components []struct { diff --git a/testing/integration/upgrade_broken_package_test.go b/testing/integration/upgrade_broken_package_test.go index d416da24f41..1dd6eac4d6d 100644 --- a/testing/integration/upgrade_broken_package_test.go +++ b/testing/integration/upgrade_broken_package_test.go @@ -32,6 +32,8 @@ func TestUpgradeBrokenPackageVersion(t *testing.T) { Sudo: true, // requires Agent installation }) + t.Skip("Skip until the first 8.15.0-SNAPSHOT is available") + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(10*time.Minute)) defer cancel() diff --git a/version/version.go b/version/version.go index 7b092deae14..221478d33bf 100644 --- a/version/version.go +++ b/version/version.go @@ -4,5 +4,5 @@ package version -const defaultBeatVersion = "8.14.0" +const defaultBeatVersion = "8.15.0" const Agent = defaultBeatVersion