Skip to content

Commit

Permalink
Fix Alerts Receiver (#1023)
Browse files Browse the repository at this point in the history
Due to missing `for`, the alerts receiver was terminated after receiving one alert. This led to alerts channel to fill up and `AddAlert` being blocking call, which resulted in entire Policy Circuit being blocked.
  • Loading branch information
kwapik authored and kklimonda-fn committed Dec 13, 2022
1 parent 116cb2f commit 598002c
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 10 deletions.
13 changes: 13 additions & 0 deletions pkg/otelcollector/alertsreceiver/alertsreceiver_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package alertsreceiver

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestAlertsreceiver(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Alertsreceiver Suite")
}
2 changes: 1 addition & 1 deletion pkg/otelcollector/alertsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func createLogsReceiver(
consumer consumer.Logs,
) (component.LogsReceiver, error) {
cfg := rConf.(*Config)
p, err := newProcessor(cfg)
p, err := newReceiver(cfg)
if err != nil {
return nil, err
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/otelcollector/alertsreceiver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type alertsReceiver struct {
shutdown func()
}

func newProcessor(cfg *Config) (*alertsReceiver, error) {
func newReceiver(cfg *Config) (*alertsReceiver, error) {
p := &alertsReceiver{
cfg: cfg,
}
Expand Down Expand Up @@ -48,13 +48,15 @@ func (p *alertsReceiver) registerLogsConsumer(lc consumer.Logs) error {
}

func (p *alertsReceiver) run(ctx context.Context) {
select {
case alert := <-p.cfg.alerter.AlertsChan():
err := p.logsConsumer.ConsumeLogs(ctx, alert.AsLogs())
// We do not care much about those errors. Alerts can be dropped sometimes,
// they are sent all the time anyway.
log.Autosample().Debug().Err(err).Msg("ConsumeLogs failed")
case <-ctx.Done():
return
for {
select {
case alert := <-p.cfg.alerter.AlertsChan():
err := p.logsConsumer.ConsumeLogs(ctx, alert.AsLogs())
// We do not care much about those errors. Alerts can be dropped sometimes,
// they are sent all the time anyway.
log.Autosample().Debug().Err(err).Msg("ConsumeLogs failed")
case <-ctx.Done():
return
}
}
}
110 changes: 110 additions & 0 deletions pkg/otelcollector/alertsreceiver/receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package alertsreceiver

import (
"context"
"fmt"
"sync"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/fluxninja/aperture/pkg/alerts"
)

var _ = Describe("Alerts receiver", func() {
var (
alerter alerts.Alerter
tc *testConsumer
receiver *alertsReceiver
)

BeforeEach(func() {
alerter = alerts.NewSimpleAlerter(1)
cfg := &Config{
alerter: alerter,
}
var err error
receiver, err = newReceiver(cfg)
Expect(err).NotTo(HaveOccurred())
tc = newTestConsumer()
receiver.logsConsumer = tc

go func() {
err := receiver.Start(nil, nil)
Expect(err).NotTo(HaveOccurred())
}()
})

AfterEach(func() {
err := receiver.Shutdown(nil)
Expect(err).NotTo(HaveOccurred())
})

It("consumes single alert properly", func() {
alert := alerts.NewAlert(alerts.WithName("foo"))
Eventually(func() error {
alerter.AddAlert(alert)
return nil
}).Should(Succeed())

Eventually(func() int {
return len(tc.ReceivedLogs())
}).Should(Equal(1))

Expect(tc.ReceivedLogs()[0]).To(Equal(alert.AsLogs()))
})

It("consumes multiple alerts properly", func() {
alertsObj := []*alerts.Alert{}
for i := 0; i < 10; i++ {
alertsObj = append(alertsObj, alerts.NewAlert(
alerts.WithName(fmt.Sprintf("foo%v", i)),
))
}

for i := 0; i < 10; i++ {
Eventually(func() error {
alerter.AddAlert(alertsObj[i])
return nil
}).Should(Succeed())
}

Eventually(func() int {
return len(tc.ReceivedLogs())
}).Should(Equal(10))

for i := 0; i < 10; i++ {
Expect(tc.ReceivedLogs()).To(ContainElement(alertsObj[i].AsLogs()))
}
})
})

func newTestConsumer() *testConsumer {
return &testConsumer{receivedLogs: []plog.Logs{}}
}

type testConsumer struct {
sync.Mutex
receivedLogs []plog.Logs
}

func (t *testConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
t.Lock()
defer t.Unlock()
t.receivedLogs = append(t.receivedLogs, ld)
return nil
}

func (t *testConsumer) ReceivedLogs() []plog.Logs {
t.Lock()
defer t.Unlock()
return t.receivedLogs
}

func (t *testConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: false,
}
}

0 comments on commit 598002c

Please sign in to comment.