From 598002c3e18ef4dbe3d579c25470fbc3911d3b88 Mon Sep 17 00:00:00 2001 From: Krzysztof Kwapisiewicz Date: Tue, 13 Dec 2022 10:50:45 +0100 Subject: [PATCH] Fix Alerts Receiver (#1023) Due to missing `for`, the alerts receiver was terminated after receiving one alert. This led to alerts channel to fill up and `AddAlert` being blocking call, which resulted in entire Policy Circuit being blocked. --- .../alertsreceiver_suite_test.go | 13 +++ pkg/otelcollector/alertsreceiver/factory.go | 2 +- pkg/otelcollector/alertsreceiver/processor.go | 20 ++-- .../alertsreceiver/receiver_test.go | 110 ++++++++++++++++++ 4 files changed, 135 insertions(+), 10 deletions(-) create mode 100644 pkg/otelcollector/alertsreceiver/alertsreceiver_suite_test.go create mode 100644 pkg/otelcollector/alertsreceiver/receiver_test.go diff --git a/pkg/otelcollector/alertsreceiver/alertsreceiver_suite_test.go b/pkg/otelcollector/alertsreceiver/alertsreceiver_suite_test.go new file mode 100644 index 0000000000..9c53c3d507 --- /dev/null +++ b/pkg/otelcollector/alertsreceiver/alertsreceiver_suite_test.go @@ -0,0 +1,13 @@ +package alertsreceiver + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestAlertsreceiver(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Alertsreceiver Suite") +} diff --git a/pkg/otelcollector/alertsreceiver/factory.go b/pkg/otelcollector/alertsreceiver/factory.go index ac01eed683..1e4b1f4194 100644 --- a/pkg/otelcollector/alertsreceiver/factory.go +++ b/pkg/otelcollector/alertsreceiver/factory.go @@ -39,7 +39,7 @@ func createLogsReceiver( consumer consumer.Logs, ) (component.LogsReceiver, error) { cfg := rConf.(*Config) - p, err := newProcessor(cfg) + p, err := newReceiver(cfg) if err != nil { return nil, err } diff --git a/pkg/otelcollector/alertsreceiver/processor.go b/pkg/otelcollector/alertsreceiver/processor.go index 93ca62d005..4a63196eaf 100644 --- a/pkg/otelcollector/alertsreceiver/processor.go +++ b/pkg/otelcollector/alertsreceiver/processor.go @@ -17,7 +17,7 @@ type alertsReceiver struct { shutdown func() } -func newProcessor(cfg *Config) (*alertsReceiver, error) { +func newReceiver(cfg *Config) (*alertsReceiver, error) { p := &alertsReceiver{ cfg: cfg, } @@ -48,13 +48,15 @@ func (p *alertsReceiver) registerLogsConsumer(lc consumer.Logs) error { } func (p *alertsReceiver) run(ctx context.Context) { - select { - case alert := <-p.cfg.alerter.AlertsChan(): - err := p.logsConsumer.ConsumeLogs(ctx, alert.AsLogs()) - // We do not care much about those errors. Alerts can be dropped sometimes, - // they are sent all the time anyway. - log.Autosample().Debug().Err(err).Msg("ConsumeLogs failed") - case <-ctx.Done(): - return + for { + select { + case alert := <-p.cfg.alerter.AlertsChan(): + err := p.logsConsumer.ConsumeLogs(ctx, alert.AsLogs()) + // We do not care much about those errors. Alerts can be dropped sometimes, + // they are sent all the time anyway. + log.Autosample().Debug().Err(err).Msg("ConsumeLogs failed") + case <-ctx.Done(): + return + } } } diff --git a/pkg/otelcollector/alertsreceiver/receiver_test.go b/pkg/otelcollector/alertsreceiver/receiver_test.go new file mode 100644 index 0000000000..e91a0c89e4 --- /dev/null +++ b/pkg/otelcollector/alertsreceiver/receiver_test.go @@ -0,0 +1,110 @@ +package alertsreceiver + +import ( + "context" + "fmt" + "sync" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/fluxninja/aperture/pkg/alerts" +) + +var _ = Describe("Alerts receiver", func() { + var ( + alerter alerts.Alerter + tc *testConsumer + receiver *alertsReceiver + ) + + BeforeEach(func() { + alerter = alerts.NewSimpleAlerter(1) + cfg := &Config{ + alerter: alerter, + } + var err error + receiver, err = newReceiver(cfg) + Expect(err).NotTo(HaveOccurred()) + tc = newTestConsumer() + receiver.logsConsumer = tc + + go func() { + err := receiver.Start(nil, nil) + Expect(err).NotTo(HaveOccurred()) + }() + }) + + AfterEach(func() { + err := receiver.Shutdown(nil) + Expect(err).NotTo(HaveOccurred()) + }) + + It("consumes single alert properly", func() { + alert := alerts.NewAlert(alerts.WithName("foo")) + Eventually(func() error { + alerter.AddAlert(alert) + return nil + }).Should(Succeed()) + + Eventually(func() int { + return len(tc.ReceivedLogs()) + }).Should(Equal(1)) + + Expect(tc.ReceivedLogs()[0]).To(Equal(alert.AsLogs())) + }) + + It("consumes multiple alerts properly", func() { + alertsObj := []*alerts.Alert{} + for i := 0; i < 10; i++ { + alertsObj = append(alertsObj, alerts.NewAlert( + alerts.WithName(fmt.Sprintf("foo%v", i)), + )) + } + + for i := 0; i < 10; i++ { + Eventually(func() error { + alerter.AddAlert(alertsObj[i]) + return nil + }).Should(Succeed()) + } + + Eventually(func() int { + return len(tc.ReceivedLogs()) + }).Should(Equal(10)) + + for i := 0; i < 10; i++ { + Expect(tc.ReceivedLogs()).To(ContainElement(alertsObj[i].AsLogs())) + } + }) +}) + +func newTestConsumer() *testConsumer { + return &testConsumer{receivedLogs: []plog.Logs{}} +} + +type testConsumer struct { + sync.Mutex + receivedLogs []plog.Logs +} + +func (t *testConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + t.Lock() + defer t.Unlock() + t.receivedLogs = append(t.receivedLogs, ld) + return nil +} + +func (t *testConsumer) ReceivedLogs() []plog.Logs { + t.Lock() + defer t.Unlock() + return t.receivedLogs +} + +func (t *testConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: false, + } +}