diff --git a/pkg/internal/ebpf/common/common.go b/pkg/internal/ebpf/common/common.go index ad2203ba8..c8d22d987 100644 --- a/pkg/internal/ebpf/common/common.go +++ b/pkg/internal/ebpf/common/common.go @@ -43,6 +43,16 @@ var IntegrityModeOverride = false var ActiveNamespaces = make(map[uint32]uint32) +// These represent unique traffic control (tc) handles to be supplied as +// arguments to RegisterIngress, RegisterEgress or RegisterTC. They all start +// from the 0xb310 offset in simplistic attempt to avoid collisions with +// 3rdparty handles. +const ( + NetollyTCHandle = 0xb310 + iota + HTTPTracerTCHandle + TCTracerTCHandle +) + // ProbeDesc holds the information of the instrumentation points of a given // function/symbol type ProbeDesc struct { diff --git a/pkg/internal/ebpf/common/tc_linux.go b/pkg/internal/ebpf/common/tc_linux.go index c26f94d3e..0ffa0379c 100644 --- a/pkg/internal/ebpf/common/tc_linux.go +++ b/pkg/internal/ebpf/common/tc_linux.go @@ -8,6 +8,7 @@ import ( "fmt" "io/fs" "log/slog" + "time" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" @@ -21,10 +22,7 @@ type TCLinks struct { IngressFilter *netlink.BpfFilter } -func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), log *slog.Logger) { - informer := ifaces.NewWatcher(channelBufferLen) - registerer := ifaces.NewRegisterer(informer, channelBufferLen) - +func StartTCMonitorLoop(ctx context.Context, registerer *ifaces.Registerer, register func(iface ifaces.Interface), log *slog.Logger) { log.Debug("subscribing for network interface events") ifaceEvents, err := registerer.Subscribe(ctx) if err != nil { @@ -54,10 +52,25 @@ func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func }() } -func RegisterTC(iface ifaces.Interface, egressFD, ingressFD int, log *slog.Logger) *TCLinks { - links := TCLinks{} +// Convenience function +func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), log *slog.Logger) { + log.Debug("listening for new interfaces: use watching") + + informer := ifaces.NewWatcher(channelBufferLen) + registerer := ifaces.NewRegisterer(informer, channelBufferLen) + StartTCMonitorLoop(ctx, registerer, register, log) +} + +// Convenience function +func PollAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), period time.Duration, log *slog.Logger) { + log.Debug("listening for new interfaces: use polling", "period", period) - // Load pre-compiled programs and maps into the kernel, and rewrites the configuration + informer := ifaces.NewPoller(period, channelBufferLen) + registerer := ifaces.NewRegisterer(informer, channelBufferLen) + StartTCMonitorLoop(ctx, registerer, register, log) +} + +func GetClsactQdisc(iface ifaces.Interface, log *slog.Logger) *netlink.GenericQdisc { ipvlan, err := netlink.LinkByIndex(iface.Index) if err != nil { log.Error("failed to lookup ipvlan device", "index", iface.Index, "name", iface.Name, "error", err) @@ -80,15 +93,29 @@ func RegisterTC(iface ifaces.Interface, egressFD, ingressFD int, log *slog.Logge return nil } } - links.Qdisc = qdisc - egressFilter, err := registerEgress(ipvlan, egressFD) + return qdisc +} + +func RegisterTC(iface ifaces.Interface, egressFD int, egressHandle uint32, egressName string, + ingressFD int, ingressHandle uint32, ingressName string, log *slog.Logger) *TCLinks { + links := TCLinks{ + Qdisc: GetClsactQdisc(iface, log), + } + + if links.Qdisc == nil { + return nil + } + + linkIndex := links.Qdisc.QdiscAttrs.LinkIndex + + egressFilter, err := RegisterEgress(linkIndex, egressFD, egressHandle, egressName) if err != nil { log.Error("failed to install egress filters", "error", err) } links.EgressFilter = egressFilter - ingressFilter, err := registerIngress(ipvlan, ingressFD) + ingressFilter, err := RegisterIngress(linkIndex, ingressFD, ingressHandle, ingressName) if err != nil { log.Error("failed to install ingres filters", "error", err) } @@ -97,62 +124,44 @@ func RegisterTC(iface ifaces.Interface, egressFD, ingressFD int, log *slog.Logge return &links } -func registerEgress(ipvlan netlink.Link, egressFD int) (*netlink.BpfFilter, error) { - // Fetch events on egress - egressAttrs := netlink.FilterAttrs{ - LinkIndex: ipvlan.Attrs().Index, - Parent: netlink.HANDLE_MIN_EGRESS, - Handle: netlink.MakeHandle(0, 1), - Protocol: unix.ETH_P_ALL, - Priority: 1, - } - egressFilter := &netlink.BpfFilter{ - FilterAttrs: egressAttrs, - Fd: egressFD, - Name: "tc/tc_http_egress", - DirectAction: true, - } - if err := netlink.FilterDel(egressFilter); err == nil { - log.Warn("egress filter already existed. Deleted it") - } - if err := netlink.FilterAdd(egressFilter); err != nil { - if errors.Is(err, fs.ErrExist) { - log.Warn("egress filter already exists. Ignoring", "error", err) - } else { - return nil, fmt.Errorf("failed to create egress filter: %w", err) - } - } +func RegisterEgress(linkIndex int, egressFD int, handle uint32, name string) (*netlink.BpfFilter, error) { + return registerFilter(linkIndex, egressFD, handle, netlink.HANDLE_MIN_EGRESS, name) +} - return egressFilter, nil +func RegisterIngress(linkIndex int, ingressFD int, handle uint32, name string) (*netlink.BpfFilter, error) { + return registerFilter(linkIndex, ingressFD, handle, netlink.HANDLE_MIN_INGRESS, name) } -func registerIngress(ipvlan netlink.Link, ingressFD int) (*netlink.BpfFilter, error) { +func registerFilter(linkIndex int, fd int, handle uint32, parent uint32, name string) (*netlink.BpfFilter, error) { // Fetch events on ingress - ingressAttrs := netlink.FilterAttrs{ - LinkIndex: ipvlan.Attrs().Index, - Parent: netlink.HANDLE_MIN_INGRESS, - Handle: netlink.MakeHandle(0, 1), + attrs := netlink.FilterAttrs{ + LinkIndex: linkIndex, + Parent: parent, + Handle: handle, Protocol: unix.ETH_P_ALL, Priority: 1, } - ingressFilter := &netlink.BpfFilter{ - FilterAttrs: ingressAttrs, - Fd: ingressFD, - Name: "tc/tc_http_ingress", + + filter := &netlink.BpfFilter{ + FilterAttrs: attrs, + Fd: fd, + Name: name, DirectAction: true, } - if err := netlink.FilterDel(ingressFilter); err == nil { - log.Warn("ingress filter already existed. Deleted it") + + if err := netlink.FilterDel(filter); err == nil { + log.Warn("filter already existed. Deleted it", "filter", name, "iface", linkIndex) } - if err := netlink.FilterAdd(ingressFilter); err != nil { + + if err := netlink.FilterAdd(filter); err != nil { if errors.Is(err, fs.ErrExist) { - log.Warn("ingress filter already exists. Ignoring", "error", err) + log.Warn("filter already exists. Ignoring", "error", err) } else { - return nil, fmt.Errorf("failed to create ingress filter: %w", err) + return nil, fmt.Errorf("failed to create filter: %w", err) } } - return ingressFilter, nil + return filter, nil } // doIgnoreNoDev runs the provided syscall over the provided device and ignores the error diff --git a/pkg/internal/ebpf/httptracer/httptracer.go b/pkg/internal/ebpf/httptracer/httptracer.go index 0ee4c7ebc..83cd4ebae 100644 --- a/pkg/internal/ebpf/httptracer/httptracer.go +++ b/pkg/internal/ebpf/httptracer/httptracer.go @@ -143,7 +143,11 @@ func (p *Tracer) Run(ctx context.Context, _ chan<- []request.Span) { } func (p *Tracer) registerTC(iface ifaces.Interface) { - links := ebpfcommon.RegisterTC(iface, p.bpfObjects.BeylaTcHttpEgress.FD(), p.bpfObjects.BeylaTcHttpIngress.FD(), p.log) + links := ebpfcommon.RegisterTC(iface, + p.bpfObjects.BeylaTcHttpEgress.FD(), ebpfcommon.HTTPTracerTCHandle, "tc/tc_http_egress", + p.bpfObjects.BeylaTcHttpIngress.FD(), ebpfcommon.HTTPTracerTCHandle, "tc/tc_http_ingress", + p.log) + if links == nil { return } diff --git a/pkg/internal/ebpf/tctracer/tctracer.go b/pkg/internal/ebpf/tctracer/tctracer.go index 8da7968ba..5e5862966 100644 --- a/pkg/internal/ebpf/tctracer/tctracer.go +++ b/pkg/internal/ebpf/tctracer/tctracer.go @@ -154,7 +154,11 @@ func (p *Tracer) Run(ctx context.Context, _ chan<- []request.Span) { } func (p *Tracer) registerTC(iface ifaces.Interface) { - links := ebpfcommon.RegisterTC(iface, p.bpfObjects.BeylaAppEgress.FD(), p.bpfObjects.BeylaAppIngress.FD(), p.log) + links := ebpfcommon.RegisterTC(iface, + p.bpfObjects.BeylaAppEgress.FD(), ebpfcommon.TCTracerTCHandle, "tc/tc_egress", + p.bpfObjects.BeylaAppIngress.FD(), ebpfcommon.TCTracerTCHandle, "tc/tc_ingress", + p.log) + if links == nil { return } diff --git a/pkg/internal/netolly/agent/agent.go b/pkg/internal/netolly/agent/agent.go index 617b6d3eb..ada16e167 100644 --- a/pkg/internal/netolly/agent/agent.go +++ b/pkg/internal/netolly/agent/agent.go @@ -28,6 +28,7 @@ import ( "github.com/cilium/ebpf/ringbuf" "github.com/grafana/beyla/pkg/beyla" + ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" "github.com/grafana/beyla/pkg/internal/netolly/ebpf" "github.com/grafana/beyla/pkg/internal/netolly/flow" "github.com/grafana/beyla/pkg/internal/netolly/ifaces" @@ -89,7 +90,7 @@ type Flows struct { ctxInfo *global.ContextInfo // input data providers - interfaces ifaces.Informer + registerer *ifaces.Registerer filter interfaceFilter ebpf ebpfFlowFetcher @@ -193,7 +194,7 @@ func flowsAgent( return &Flows{ ctxInfo: ctxInfo, ebpf: fetcher, - interfaces: registerer, + registerer: registerer, filter: filter, cfg: cfg, mapTracer: mapTracer, @@ -259,32 +260,7 @@ func (f *Flows) Status() Status { func (f *Flows) interfacesManager(ctx context.Context) error { slog := alog().With("function", "interfacesManager") - slog.Debug("subscribing for network interface events") - ifaceEvents, err := f.interfaces.Subscribe(ctx) - if err != nil { - return fmt.Errorf("instantiating interfaces' informer: %w", err) - } - - go func() { - for { - select { - case <-ctx.Done(): - slog.Debug("stopping interfaces' listener") - return - case event := <-ifaceEvents: - slog.Debug("received event", "event", event) - switch event.Type { - case ifaces.EventAdded: - f.onInterfaceAdded(event.Interface) - case ifaces.EventDeleted: - // qdiscs, ingress and egress filters are automatically deleted so we don't need to - // specifically detach them from the ebpfFetcher - default: - slog.Warn("unknown event type", "event", event) - } - } - } - }() + ebpfcommon.StartTCMonitorLoop(ctx, f.registerer, f.onInterfaceAdded, slog) return nil } diff --git a/pkg/internal/netolly/agent/pipeline_test.go b/pkg/internal/netolly/agent/pipeline_test.go index 96b3dee28..40061ca1c 100644 --- a/pkg/internal/netolly/agent/pipeline_test.go +++ b/pkg/internal/netolly/agent/pipeline_test.go @@ -56,7 +56,7 @@ func TestFilter(t *testing.T) { }, }}, }, - interfaces: fakeInterfacesInformer{}, + registerer: ifaces.NewRegisterer(fakeInterfacesInformer{}, 10), interfaceNamer: func(_ int) string { return "fakeiface" }, } diff --git a/pkg/internal/netolly/ebpf/tracer.go b/pkg/internal/netolly/ebpf/tracer.go index a9d00b4a8..1077d352c 100644 --- a/pkg/internal/netolly/ebpf/tracer.go +++ b/pkg/internal/netolly/ebpf/tracer.go @@ -24,15 +24,14 @@ import ( "context" "errors" "fmt" - "io/fs" "log/slog" "strings" "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" "github.com/vishvananda/netlink" - "golang.org/x/sys/unix" + ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" "github.com/grafana/beyla/pkg/internal/netolly/ifaces" ) @@ -40,7 +39,6 @@ import ( //go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -type flow_metrics_t -type flow_id_t -type flow_record_t -target amd64,arm64 Net ../../../../bpf/flows.c -- -I../../../../bpf/headers const ( - qdiscType = "clsact" // constants defined in flows.c as "volatile const" constSampling = "sampling" constTraceMessages = "trace_messages" @@ -126,105 +124,48 @@ func NewFlowFetcher( // before exiting. func (m *FlowFetcher) Register(iface ifaces.Interface) error { ilog := m.log.With("interface", iface) - // Load pre-compiled programs and maps into the kernel, and rewrites the configuration - ipvlan, err := netlink.LinkByIndex(iface.Index) - if err != nil { - return fmt.Errorf("failed to lookup ipvlan device %d (%s): %w", iface.Index, iface.Name, err) - } - qdiscAttrs := netlink.QdiscAttrs{ - LinkIndex: ipvlan.Attrs().Index, - Handle: netlink.MakeHandle(0xffff, 0), - Parent: netlink.HANDLE_CLSACT, - } - qdisc := &netlink.GenericQdisc{ - QdiscAttrs: qdiscAttrs, - QdiscType: qdiscType, - } - if err := netlink.QdiscDel(qdisc); err == nil { - ilog.Warn("qdisc clsact already existed. Deleted it") + + if !m.enableEgress && !m.enableIngress { + ilog.Debug("both egress and ingress have been disabled in the configuration, skipping") + return nil } - if err := netlink.QdiscAdd(qdisc); err != nil { - if errors.Is(err, fs.ErrExist) { - ilog.Warn("qdisc clsact already exists. Ignoring", "error", err) - } else { - // nolint:errorlint - return fmt.Errorf("failed to create clsact qdisc on %d (%s): %T %w", iface.Index, iface.Name, err, err) - } + + qdisc := ebpfcommon.GetClsactQdisc(iface, ilog) + + if qdisc == nil { + return fmt.Errorf("failed to obtain a clsact qdisc") } + m.qdiscs[iface] = qdisc - if err := m.registerEgress(iface, ipvlan); err != nil { - return err - } + linkIndex := qdisc.QdiscAttrs.LinkIndex - return m.registerIngress(iface, ipvlan) -} + if m.enableEgress { + filter, err := ebpfcommon.RegisterEgress(linkIndex, + m.objects.BeylaEgressFlowParse.FD(), ebpfcommon.NetollyTCHandle, "tc/egress_flow_parse") -func (m *FlowFetcher) registerEgress(iface ifaces.Interface, ipvlan netlink.Link) error { - ilog := m.log.With("interface", iface) - if !m.enableEgress { + if err != nil { + return fmt.Errorf("failed to install egress filters: %w", err) + } + + m.egressFilters[iface] = filter + } else { ilog.Debug("ignoring egress traffic, according to user configuration") - return nil - } - // Fetch events on egress - egressAttrs := netlink.FilterAttrs{ - LinkIndex: ipvlan.Attrs().Index, - Parent: netlink.HANDLE_MIN_EGRESS, - Handle: netlink.MakeHandle(0, 1), - Protocol: 3, - Priority: 1, - } - egressFilter := &netlink.BpfFilter{ - FilterAttrs: egressAttrs, - Fd: m.objects.BeylaEgressFlowParse.FD(), - Name: "tc/egress_flow_parse", - DirectAction: true, - } - if err := netlink.FilterDel(egressFilter); err == nil { - ilog.Warn("egress filter already existed. Deleted it") } - if err := netlink.FilterAdd(egressFilter); err != nil { - if errors.Is(err, fs.ErrExist) { - ilog.Warn("egress filter already exists. Ignoring", "error", err) - } else { - return fmt.Errorf("failed to create egress filter: %w", err) + + if m.enableIngress { + filter, err := ebpfcommon.RegisterIngress(linkIndex, + m.objects.BeylaIngressFlowParse.FD(), ebpfcommon.NetollyTCHandle, "tc/ingress_flow_parse") + + if err != nil { + return fmt.Errorf("failed to install ingress filters: %w", err) } - } - m.egressFilters[iface] = egressFilter - return nil -} -func (m *FlowFetcher) registerIngress(iface ifaces.Interface, ipvlan netlink.Link) error { - ilog := m.log.With("interface", iface) - if !m.enableIngress { + m.ingressFilters[iface] = filter + } else { ilog.Debug("ignoring ingress traffic, according to user configuration") - return nil - } - // Fetch events on ingress - ingressAttrs := netlink.FilterAttrs{ - LinkIndex: ipvlan.Attrs().Index, - Parent: netlink.HANDLE_MIN_INGRESS, - Handle: netlink.MakeHandle(0, 1), - Protocol: unix.ETH_P_ALL, - Priority: 1, - } - ingressFilter := &netlink.BpfFilter{ - FilterAttrs: ingressAttrs, - Fd: m.objects.BeylaIngressFlowParse.FD(), - Name: "tc/ingress_flow_parse", - DirectAction: true, - } - if err := netlink.FilterDel(ingressFilter); err == nil { - ilog.Warn("ingress filter already existed. Deleted it") } - if err := netlink.FilterAdd(ingressFilter); err != nil { - if errors.Is(err, fs.ErrExist) { - ilog.Warn("ingress filter already exists. Ignoring", "error", err) - } else { - return fmt.Errorf("failed to create ingress filter: %w", err) - } - } - m.ingressFilters[iface] = ingressFilter + return nil } @@ -235,6 +176,12 @@ func (m *FlowFetcher) Close() error { log := tlog() log.Debug("unregistering eBPF objects") + ebpfcommon.CloseTCLinks(m.qdiscs, m.egressFilters, m.ingressFilters, log) + + m.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{} + m.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{} + m.qdiscs = map[ifaces.Interface]*netlink.GenericQdisc{} + var errs []error // m.ringbufReader.Read is a blocking operation, so we need to close the ring buffer // from another goroutine to avoid the system not being able to exit if there @@ -244,39 +191,21 @@ func (m *FlowFetcher) Close() error { errs = append(errs, err) } } + if m.objects != nil { errs = append(errs, m.closeObjects()...) } - for iface, ef := range m.egressFilters { - log.Debug("deleting egress filter", "interface", iface) - if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(ef)); err != nil { - errs = append(errs, fmt.Errorf("deleting egress filter: %w", err)) - } - } - m.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - for iface, igf := range m.ingressFilters { - log.Debug("deleting ingress filter", "interface", iface) - if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(igf)); err != nil { - errs = append(errs, fmt.Errorf("deleting ingress filter: %w", err)) - } - } - m.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - for iface, qd := range m.qdiscs { - log.Debug("deleting Qdisc", "interface", iface) - if err := doIgnoreNoDev(netlink.QdiscDel, netlink.Qdisc(qd)); err != nil { - errs = append(errs, fmt.Errorf("deleting qdisc: %w", err)) - } - } - m.qdiscs = map[ifaces.Interface]*netlink.GenericQdisc{} - if len(errs) == 0 { - return nil - } var errStrings []string for _, err := range errs { errStrings = append(errStrings, err.Error()) } - return errors.New(`errors: "` + strings.Join(errStrings, `", "`) + `"`) + + if len(errs) > 0 { + return errors.New(`errors: "` + strings.Join(errStrings, `", "`) + `"`) + } + + return nil } func (m *FlowFetcher) closeObjects() []error { @@ -297,29 +226,6 @@ func (m *FlowFetcher) closeObjects() []error { return errs } -// doIgnoreNoDev runs the provided syscall over the provided device and ignores the error -// if the cause is a non-existing device (just logs the error as debug). -// If the agent is deployed as part of the Network Metrics pipeline, normally -// undeploying the FlowCollector could cause the agent to try to remove resources -// from Pods that have been removed immediately before (e.g. flowlogs-pipeline or the -// console plugin), so we avoid logging some errors that would unnecessarily raise the -// user's attention. -// This function uses generics because the set of provided functions accept different argument -// types. -func doIgnoreNoDev[T any](sysCall func(T) error, dev T) error { - if err := sysCall(dev); err != nil { - if errors.Is(err, unix.ENODEV) { - tlog().Error("can't delete. Ignore this error if other pods or interfaces "+ - " are also being deleted at this moment. For example, if you are undeploying "+ - " a FlowCollector or Deployment where this agent is part of", - "error", err) - } else { - return err - } - } - return nil -} - func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { return m.ringbufReader.Read() }