Skip to content

Commit

Permalink
feat: try rjeczalik/notify
Browse files Browse the repository at this point in the history
Signed-off-by: black-desk <me@black-desk.cn>
  • Loading branch information
black-desk committed Oct 20, 2023
1 parent ad5f56e commit 9770260
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 4 deletions.
8 changes: 4 additions & 4 deletions cmd/cgtproxy/cmd/providers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cmd

import (
"github.com/black-desk/cgtproxy/pkg/cgfsmon"
"github.com/black-desk/cgtproxy/pkg/cgfsmon2"
"github.com/black-desk/cgtproxy/pkg/cgtproxy"
"github.com/black-desk/cgtproxy/pkg/cgtproxy/config"
"github.com/black-desk/cgtproxy/pkg/interfaces"
Expand Down Expand Up @@ -86,9 +86,9 @@ func provideCgrougMontior(
) (
interfaces.CGroupMonitor, error,
) {
return cgfsmon.New(
cgfsmon.WithCgroupRoot(cgroupRoot),
cgfsmon.WithLogger(logger),
return cgfsmon2.New(
cgfsmon2.WithCgroupRoot(cgroupRoot),
cgfsmon2.WithLogger(logger),
)
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mdlayher/netlink v1.7.2 // indirect
github.com/mdlayher/socket v0.5.0 // indirect
github.com/rjeczalik/notify v0.9.3
github.com/spf13/pflag v1.0.5 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c=
github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rjeczalik/notify v0.9.3 h1:6rJAzHTGKXGj76sbRgDiDcYj/HniypXmSJo1SWakZeY=
github.com/rjeczalik/notify v0.9.3/go.mod h1:gF3zSOrafR9DQEWSE8TjfI9NkooDxbyT4UgRGKZA0lc=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down Expand Up @@ -98,6 +100,7 @@ golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190912141932-bc967efca4b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
10 changes: 10 additions & 0 deletions pkg/cgfsmon2/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cgfsmon2

import "errors"

var (
ErrContextMissing = errors.New("context is missing.")
ErrCGroupRootNotFound = errors.New("cgroup v2 file system mount point is missing.")
ErrLoggerMissing = errors.New("logger is missing.")
ErrUnderlingWatcherExited = errors.New("underling file system watcher has exited.")
)
76 changes: 76 additions & 0 deletions pkg/cgfsmon2/new.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package cgfsmon2

import (
"github.com/black-desk/cgtproxy/pkg/cgtproxy/config"
"github.com/black-desk/cgtproxy/pkg/types"
. "github.com/black-desk/lib/go/errwrap"
"github.com/rjeczalik/notify"
"go.uber.org/zap"
)

type CGroupFSMonitor struct {
eventsOut chan types.CGroupEvent
eventsIn chan notify.EventInfo

root config.CGroupRoot
log *zap.SugaredLogger
}

func New(opts ...Opt) (ret *CGroupFSMonitor, err error) {
defer Wrap(&err, "create filesystem watcher")

w := &CGroupFSMonitor{}

w.eventsOut = make(chan types.CGroupEvent)
w.eventsIn = make(chan notify.EventInfo)

for i := range opts {
w, err = opts[i](w)
if err != nil {
return
}
}

if w.log == nil {
w.log = zap.NewNop().Sugar()
}

if w.root == "" {
err = ErrCGroupRootNotFound
return
}

ret = w

w.log.Debugw("Create a new filesystem watcher.")

return
}

type Opt func(w *CGroupFSMonitor) (ret *CGroupFSMonitor, err error)

func WithCgroupRoot(root config.CGroupRoot) Opt {
return func(w *CGroupFSMonitor) (ret *CGroupFSMonitor, err error) {
if root == "" {
err = ErrCGroupRootNotFound
return
}

w.root = root
ret = w
return
}
}

func WithLogger(log *zap.SugaredLogger) Opt {
return func(w *CGroupFSMonitor) (ret *CGroupFSMonitor, err error) {
if log == nil {
err = ErrLoggerMissing
return
}

w.log = log
ret = w
return
}
}
82 changes: 82 additions & 0 deletions pkg/cgfsmon2/private.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package cgfsmon2

import (
"context"
"errors"
"io/fs"
"path/filepath"
"strings"

"github.com/black-desk/cgtproxy/pkg/types"
)

func (m *CGroupFSMonitor) walkFn(ctx context.Context) func(path string, d fs.DirEntry, err error) error {
return func(path string, d fs.DirEntry, err error) error {
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
m.log.Debug(
"Cgroup had been removed.",
"path", path,
)
err = nil
}
m.log.Errorw(
"Errors occurred while first time going through cgroupfs.",
"path", path,
"error", err,
)
err = nil
}

if !d.IsDir() {
return nil
}

cgEvent := &types.CGroupEvent{
Path: path,
EventType: types.CgroupEventTypeNew,
}

err = m.send(ctx, cgEvent)
if err != nil {
return err
}

return nil
}
}

func (m *CGroupFSMonitor) walk(ctx context.Context, path string) {
err := filepath.WalkDir(path, m.walkFn(ctx))
if err == nil {
return
}

return
}

func (m *CGroupFSMonitor) send(ctx context.Context, cgEvent *types.CGroupEvent) (err error) {
path := strings.TrimRight(cgEvent.Path, "/")
cgEvent.Path = path

if cgEvent.Path == string(m.root) {
// NOTE: Ignore cgroup root.
return nil
}

m.log.Debugw("New cgroup envent.",
"event", cgEvent,
)

select {
case <-ctx.Done():
err = ctx.Err()
return
case m.eventsOut <- *cgEvent:
m.log.Debugw("Cgroup event sent.",
"path", path,
)
}

return
}
55 changes: 55 additions & 0 deletions pkg/cgfsmon2/public.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cgfsmon2

import (
"context"

"github.com/black-desk/cgtproxy/pkg/types"
. "github.com/black-desk/lib/go/errwrap"
"github.com/rjeczalik/notify"
)

func (w *CGroupFSMonitor) Events() <-chan types.CGroupEvent {
return w.eventsOut
}

func (w *CGroupFSMonitor) Run(ctx context.Context) (err error) {
defer Wrap(&err, "running filesystem watcher")
defer close(w.eventsOut)
defer notify.Stop(w.eventsIn)
defer close(w.eventsIn)

err = notify.Watch(string(w.root)+"/...", w.eventsIn, notify.Create, notify.Remove)
if err != nil {
return
}

w.log.Info("Going through cgroupfs first time...")
w.walk(ctx, string(w.root))
w.log.Info("Going through cgroupfs first time...Done.")

LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case event := <-w.eventsIn:
eventType := types.CgroupEventTypeNew
if event.Event() == notify.InDelete {
eventType = types.CgroupEventTypeDelete
}

err = w.send(ctx, &types.CGroupEvent{
Path: event.Path(),
EventType: eventType,
})
}
}

<-ctx.Done()
err = ctx.Err()
if err != nil {
return
}

return
}

0 comments on commit 9770260

Please sign in to comment.