diff --git a/cmd/cgtproxy/cmd/providers.go b/cmd/cgtproxy/cmd/providers.go index bbc57e0..dcdbb53 100644 --- a/cmd/cgtproxy/cmd/providers.go +++ b/cmd/cgtproxy/cmd/providers.go @@ -1,7 +1,7 @@ package cmd import ( - "github.com/black-desk/cgtproxy/pkg/cgfsmon" + "github.com/black-desk/cgtproxy/pkg/cgfsmon2" "github.com/black-desk/cgtproxy/pkg/cgtproxy" "github.com/black-desk/cgtproxy/pkg/cgtproxy/config" "github.com/black-desk/cgtproxy/pkg/interfaces" @@ -86,9 +86,9 @@ func provideCgrougMontior( ) ( interfaces.CGroupMonitor, error, ) { - return cgfsmon.New( - cgfsmon.WithCgroupRoot(cgroupRoot), - cgfsmon.WithLogger(logger), + return cgfsmon2.New( + cgfsmon2.WithCgroupRoot(cgroupRoot), + cgfsmon2.WithLogger(logger), ) } diff --git a/go.mod b/go.mod index 4ee8603..dcd0e52 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/leodido/go-urn v1.2.4 // indirect github.com/mdlayher/netlink v1.7.2 // indirect github.com/mdlayher/socket v0.5.0 // indirect + github.com/rjeczalik/notify v0.9.3 github.com/spf13/pflag v1.0.5 // indirect github.com/vishvananda/netns v0.0.4 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index f4e9076..f78bc6b 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,8 @@ github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c= github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rjeczalik/notify v0.9.3 h1:6rJAzHTGKXGj76sbRgDiDcYj/HniypXmSJo1SWakZeY= +github.com/rjeczalik/notify v0.9.3/go.mod h1:gF3zSOrafR9DQEWSE8TjfI9NkooDxbyT4UgRGKZA0lc= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -98,6 +100,7 @@ golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190912141932-bc967efca4b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/cgfsmon2/error.go b/pkg/cgfsmon2/error.go new file mode 100644 index 0000000..477a45d --- /dev/null +++ b/pkg/cgfsmon2/error.go @@ -0,0 +1,10 @@ +package cgfsmon2 + +import "errors" + +var ( + ErrContextMissing = errors.New("context is missing.") + ErrCGroupRootNotFound = errors.New("cgroup v2 file system mount point is missing.") + ErrLoggerMissing = errors.New("logger is missing.") + ErrUnderlingWatcherExited = errors.New("underling file system watcher has exited.") +) diff --git a/pkg/cgfsmon2/new.go b/pkg/cgfsmon2/new.go new file mode 100644 index 0000000..8cf1b1d --- /dev/null +++ b/pkg/cgfsmon2/new.go @@ -0,0 +1,76 @@ +package cgfsmon2 + +import ( + "github.com/black-desk/cgtproxy/pkg/cgtproxy/config" + "github.com/black-desk/cgtproxy/pkg/types" + . "github.com/black-desk/lib/go/errwrap" + "github.com/rjeczalik/notify" + "go.uber.org/zap" +) + +type CGroupFSMonitor struct { + eventsOut chan types.CGroupEvent + eventsIn chan notify.EventInfo + + root config.CGroupRoot + log *zap.SugaredLogger +} + +func New(opts ...Opt) (ret *CGroupFSMonitor, err error) { + defer Wrap(&err, "create filesystem watcher") + + w := &CGroupFSMonitor{} + + w.eventsOut = make(chan types.CGroupEvent) + w.eventsIn = make(chan notify.EventInfo) + + for i := range opts { + w, err = opts[i](w) + if err != nil { + return + } + } + + if w.log == nil { + w.log = zap.NewNop().Sugar() + } + + if w.root == "" { + err = ErrCGroupRootNotFound + return + } + + ret = w + + w.log.Debugw("Create a new filesystem watcher.") + + return +} + +type Opt func(w *CGroupFSMonitor) (ret *CGroupFSMonitor, err error) + +func WithCgroupRoot(root config.CGroupRoot) Opt { + return func(w *CGroupFSMonitor) (ret *CGroupFSMonitor, err error) { + if root == "" { + err = ErrCGroupRootNotFound + return + } + + w.root = root + ret = w + return + } +} + +func WithLogger(log *zap.SugaredLogger) Opt { + return func(w *CGroupFSMonitor) (ret *CGroupFSMonitor, err error) { + if log == nil { + err = ErrLoggerMissing + return + } + + w.log = log + ret = w + return + } +} diff --git a/pkg/cgfsmon2/private.go b/pkg/cgfsmon2/private.go new file mode 100644 index 0000000..9089e4e --- /dev/null +++ b/pkg/cgfsmon2/private.go @@ -0,0 +1,82 @@ +package cgfsmon2 + +import ( + "context" + "errors" + "io/fs" + "path/filepath" + "strings" + + "github.com/black-desk/cgtproxy/pkg/types" +) + +func (m *CGroupFSMonitor) walkFn(ctx context.Context) func(path string, d fs.DirEntry, err error) error { + return func(path string, d fs.DirEntry, err error) error { + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + m.log.Debug( + "Cgroup had been removed.", + "path", path, + ) + err = nil + } + m.log.Errorw( + "Errors occurred while first time going through cgroupfs.", + "path", path, + "error", err, + ) + err = nil + } + + if !d.IsDir() { + return nil + } + + cgEvent := &types.CGroupEvent{ + Path: path, + EventType: types.CgroupEventTypeNew, + } + + err = m.send(ctx, cgEvent) + if err != nil { + return err + } + + return nil + } +} + +func (m *CGroupFSMonitor) walk(ctx context.Context, path string) { + err := filepath.WalkDir(path, m.walkFn(ctx)) + if err == nil { + return + } + + return +} + +func (m *CGroupFSMonitor) send(ctx context.Context, cgEvent *types.CGroupEvent) (err error) { + path := strings.TrimRight(cgEvent.Path, "/") + cgEvent.Path = path + + if cgEvent.Path == string(m.root) { + // NOTE: Ignore cgroup root. + return nil + } + + m.log.Debugw("New cgroup envent.", + "event", cgEvent, + ) + + select { + case <-ctx.Done(): + err = ctx.Err() + return + case m.eventsOut <- *cgEvent: + m.log.Debugw("Cgroup event sent.", + "path", path, + ) + } + + return +} diff --git a/pkg/cgfsmon2/public.go b/pkg/cgfsmon2/public.go new file mode 100644 index 0000000..230a112 --- /dev/null +++ b/pkg/cgfsmon2/public.go @@ -0,0 +1,55 @@ +package cgfsmon2 + +import ( + "context" + + "github.com/black-desk/cgtproxy/pkg/types" + . "github.com/black-desk/lib/go/errwrap" + "github.com/rjeczalik/notify" +) + +func (w *CGroupFSMonitor) Events() <-chan types.CGroupEvent { + return w.eventsOut +} + +func (w *CGroupFSMonitor) Run(ctx context.Context) (err error) { + defer Wrap(&err, "running filesystem watcher") + defer close(w.eventsOut) + defer notify.Stop(w.eventsIn) + defer close(w.eventsIn) + + err = notify.Watch(string(w.root)+"/...", w.eventsIn, notify.Create, notify.Remove) + if err != nil { + return + } + + w.log.Info("Going through cgroupfs first time...") + w.walk(ctx, string(w.root)) + w.log.Info("Going through cgroupfs first time...Done.") + +LOOP: + for { + select { + case <-ctx.Done(): + break LOOP + case event := <-w.eventsIn: + eventType := types.CgroupEventTypeNew + if event.Event() == notify.InDelete { + eventType = types.CgroupEventTypeDelete + } + + err = w.send(ctx, &types.CGroupEvent{ + Path: event.Path(), + EventType: eventType, + }) + } + } + + <-ctx.Done() + err = ctx.Err() + if err != nil { + return + } + + return +}