Skip to content

Commit 61fff43

Browse files
djosephsenrenovate[bot]Kavindu-Dodanbeeme1mr
authored
feat: add 'watcher' interface to file sync (#1365)
Implement fsnotify and `os.Stat` based watchers fixes: #1344 <!-- Please use this template for your pull request. --> <!-- Please use the sections that you need and delete other sections --> ## This PR Intent of this PR is to begin a conversation about fixing #1344. The approach taken is to replace the current use of `fsontify.Watcher` with a local `Watcher` interface type that describes the `fsnotify.Watcher` interface. My original take was to use fsnotify.Watcher directly as an implementation of local `Watcher`, but fsnotify's Watcher directly exposes its Error and Event channels, making it impossible to describe with an interface, so I had to create a small wrapper for `fsnotify.Watcher` to satisfy the new Watcher interface (this is fsnotify_watcher.go). From there, we implement the `Watcher` interface again, this time using `os.Stat` and `fs.FileInfo` (this is fileinfo_watcher.go). Then we change the filepath sync code to use an interface to Watcher, rather than fsnotify.Watcher directly. The new fileinfo watcher plugs right in, and nothing really needs to change in the sync. * I have not wired up configs, so the fileinfo watcher has a hard-coded 1-second polling interval, and there is no current means of selecting between them. * I've added a couple tests, to demonstrate how unit tests would work in general (we use a configurable os-stat func in the fileinfo watcher, which can be mocked for tests) * I don't have a way of testing this on Windows. I'm vaguely aware there's an upstream issue in package `fs` that may require some work-around boilerplate to make this work on windows at the moment. If yall are favorable to this approach, I'll finish wiring up configs, and flesh out the tests. I didn't want to go much further without some buy-in or feedback. ### Related Issues Fixes #1344 ### Notes See bullet-points above ### How to test go test -v ./... --------- Signed-off-by: Dave Josephsen <dave.josephsen@gmail.com> Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com> Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Co-authored-by: Kavindu Dodanduwa <Kavindu-Dodan@users.noreply.github.com> Co-authored-by: Michael Beemer <beeme1mr@users.noreply.github.com>
1 parent abb5ca3 commit 61fff43

File tree

6 files changed

+617
-26
lines changed

6 files changed

+617
-26
lines changed

core/pkg/sync/builder/syncbuilder.go

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"net/http"
66
"os"
77
"regexp"
8-
msync "sync"
98
"time"
109

1110
"github.com/open-feature/flagd/core/pkg/logger"
@@ -26,6 +25,8 @@ import (
2625

2726
const (
2827
syncProviderFile = "file"
28+
syncProviderFsNotify = "fsnotify"
29+
syncProviderFileInfo = "fileinfo"
2930
syncProviderGrpc = "grpc"
3031
syncProviderKubernetes = "kubernetes"
3132
syncProviderHTTP = "http"
@@ -91,8 +92,13 @@ func (sb *SyncBuilder) SyncsFromConfig(sourceConfigs []sync.SourceConfig, logger
9192
func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *logger.Logger) (sync.ISync, error) {
9293
switch sourceConfig.Provider {
9394
case syncProviderFile:
94-
logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", sourceConfig.URI))
9595
return sb.newFile(sourceConfig.URI, logger), nil
96+
case syncProviderFsNotify:
97+
logger.Debug(fmt.Sprintf("using fsnotify sync-provider for: %q", sourceConfig.URI))
98+
return sb.newFsNotify(sourceConfig.URI, logger), nil
99+
case syncProviderFileInfo:
100+
logger.Debug(fmt.Sprintf("using fileinfo sync-provider for: %q", sourceConfig.URI))
101+
return sb.newFileInfo(sourceConfig.URI, logger), nil
96102
case syncProviderKubernetes:
97103
logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", sourceConfig.URI))
98104
return sb.newK8s(sourceConfig.URI, logger)
@@ -107,20 +113,46 @@ func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *lo
107113
return sb.newGcs(sourceConfig, logger), nil
108114

109115
default:
110-
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'",
111-
sourceConfig.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
116+
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s', %s', '%s' or '%s'",
117+
sourceConfig.Provider, syncProviderFile, syncProviderFsNotify, syncProviderFileInfo,
118+
syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
112119
}
113120
}
114121

122+
// newFile returns an fsinfo sync if we are in k8s or fileinfo if not
115123
func (sb *SyncBuilder) newFile(uri string, logger *logger.Logger) *file.Sync {
116-
return &file.Sync{
117-
URI: regFile.ReplaceAllString(uri, ""),
118-
Logger: logger.WithFields(
124+
switch os.Getenv("KUBERNETES_SERVICE_HOST") {
125+
case "":
126+
// no k8s service host env; use fileinfo
127+
return sb.newFileInfo(uri, logger)
128+
default:
129+
// default to fsnotify
130+
return sb.newFsNotify(uri, logger)
131+
}
132+
}
133+
134+
// return a new file.Sync that uses fsnotify under the hood
135+
func (sb *SyncBuilder) newFsNotify(uri string, logger *logger.Logger) *file.Sync {
136+
return file.NewFileSync(
137+
regFile.ReplaceAllString(uri, ""),
138+
file.FSNOTIFY,
139+
logger.WithFields(
119140
zap.String("component", "sync"),
120-
zap.String("sync", "filepath"),
141+
zap.String("sync", syncProviderFsNotify),
121142
),
122-
Mux: &msync.RWMutex{},
123-
}
143+
)
144+
}
145+
146+
// return a new file.Sync that uses os.Stat/fs.FileInfo under the hood
147+
func (sb *SyncBuilder) newFileInfo(uri string, logger *logger.Logger) *file.Sync {
148+
return file.NewFileSync(
149+
regFile.ReplaceAllString(uri, ""),
150+
file.FILEINFO,
151+
logger.WithFields(
152+
zap.String("component", "sync"),
153+
zap.String("sync", syncProviderFileInfo),
154+
),
155+
)
124156
}
125157

126158
func (sb *SyncBuilder) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package file
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io/fs"
8+
"os"
9+
"sync"
10+
"time"
11+
12+
"github.com/fsnotify/fsnotify"
13+
"github.com/open-feature/flagd/core/pkg/logger"
14+
)
15+
16+
// Implements file.Watcher using a timer and os.FileInfo
17+
type fileInfoWatcher struct {
18+
// Event Chan
19+
evChan chan fsnotify.Event
20+
// Errors Chan
21+
erChan chan error
22+
// logger
23+
logger *logger.Logger
24+
// Func to wrap os.Stat (injection point for test helpers)
25+
statFunc func(string) (fs.FileInfo, error)
26+
// thread-safe interface to underlying files we are watching
27+
mu sync.RWMutex
28+
watches map[string]fs.FileInfo // filename -> info
29+
}
30+
31+
// NewFsNotifyWatcher returns a new fsNotifyWatcher
32+
func NewFileInfoWatcher(ctx context.Context, logger *logger.Logger) Watcher {
33+
fiw := &fileInfoWatcher{
34+
evChan: make(chan fsnotify.Event, 32),
35+
erChan: make(chan error, 32),
36+
statFunc: getFileInfo,
37+
logger: logger,
38+
watches: make(map[string]fs.FileInfo),
39+
}
40+
fiw.run(ctx, (1 * time.Second))
41+
return fiw
42+
}
43+
44+
// fileInfoWatcher explicitly implements file.Watcher
45+
var _ Watcher = &fileInfoWatcher{}
46+
47+
// Close calls close on the underlying fsnotify.Watcher
48+
func (f *fileInfoWatcher) Close() error {
49+
// close all channels and exit
50+
close(f.evChan)
51+
close(f.erChan)
52+
return nil
53+
}
54+
55+
// Add calls Add on the underlying fsnotify.Watcher
56+
func (f *fileInfoWatcher) Add(name string) error {
57+
f.mu.Lock()
58+
defer f.mu.Unlock()
59+
60+
// exit early if name already exists
61+
if _, ok := f.watches[name]; ok {
62+
return nil
63+
}
64+
65+
info, err := f.statFunc(name)
66+
if err != nil {
67+
return err
68+
}
69+
70+
f.watches[name] = info
71+
72+
return nil
73+
}
74+
75+
// Remove calls Remove on the underlying fsnotify.Watcher
76+
func (f *fileInfoWatcher) Remove(name string) error {
77+
f.mu.Lock()
78+
defer f.mu.Unlock()
79+
80+
// no need to exit early, deleting non-existent key is a no-op
81+
delete(f.watches, name)
82+
83+
return nil
84+
}
85+
86+
// Watchlist calls watchlist on the underlying fsnotify.Watcher
87+
func (f *fileInfoWatcher) WatchList() []string {
88+
f.mu.RLock()
89+
defer f.mu.RUnlock()
90+
out := []string{}
91+
for name := range f.watches {
92+
n := name
93+
out = append(out, n)
94+
}
95+
return out
96+
}
97+
98+
// Events returns the underlying watcher's Events chan
99+
func (f *fileInfoWatcher) Events() chan fsnotify.Event {
100+
return f.evChan
101+
}
102+
103+
// Errors returns the underlying watcher's Errors chan
104+
func (f *fileInfoWatcher) Errors() chan error {
105+
return f.erChan
106+
}
107+
108+
// run is a blocking function that starts the filewatcher's timer thread
109+
func (f *fileInfoWatcher) run(ctx context.Context, s time.Duration) {
110+
// timer thread
111+
go func() {
112+
// execute update on the configured interval of time
113+
ticker := time.NewTicker(s)
114+
defer ticker.Stop()
115+
116+
for {
117+
select {
118+
case <-ctx.Done():
119+
return
120+
case <-ticker.C:
121+
if err := f.update(); err != nil {
122+
f.erChan <- err
123+
return
124+
}
125+
}
126+
}
127+
}()
128+
}
129+
130+
func (f *fileInfoWatcher) update() error {
131+
f.mu.Lock()
132+
defer f.mu.Unlock()
133+
134+
for path, info := range f.watches {
135+
newInfo, err := f.statFunc(path)
136+
if err != nil {
137+
// if the file isn't there, it must have been removed
138+
// fire off a remove event and remove it from the watches
139+
if errors.Is(err, os.ErrNotExist) {
140+
f.evChan <- fsnotify.Event{
141+
Name: path,
142+
Op: fsnotify.Remove,
143+
}
144+
delete(f.watches, path)
145+
continue
146+
}
147+
return err
148+
}
149+
150+
// if the new stat doesn't match the old stat, figure out what changed
151+
if info != newInfo {
152+
event := f.generateEvent(path, newInfo)
153+
if event != nil {
154+
f.evChan <- *event
155+
}
156+
f.watches[path] = newInfo
157+
}
158+
}
159+
return nil
160+
}
161+
162+
// generateEvent figures out what changed and generates an fsnotify.Event for it. (if we care)
163+
// file removal are handled above in the update() method
164+
func (f *fileInfoWatcher) generateEvent(path string, newInfo fs.FileInfo) *fsnotify.Event {
165+
info := f.watches[path]
166+
switch {
167+
// new mod time is more recent than old mod time, generate a write event
168+
case newInfo.ModTime().After(info.ModTime()):
169+
return &fsnotify.Event{
170+
Name: path,
171+
Op: fsnotify.Write,
172+
}
173+
// the file modes changed, generate a chmod event
174+
case info.Mode() != newInfo.Mode():
175+
return &fsnotify.Event{
176+
Name: path,
177+
Op: fsnotify.Chmod,
178+
}
179+
// nothing changed that we care about
180+
default:
181+
return nil
182+
}
183+
}
184+
185+
// getFileInfo returns the fs.FileInfo for the given path
186+
func getFileInfo(path string) (fs.FileInfo, error) {
187+
f, err := os.Open(path)
188+
if err != nil {
189+
return nil, fmt.Errorf("error from os.Open(%s): %w", path, err)
190+
}
191+
192+
info, err := f.Stat()
193+
if err != nil {
194+
return info, fmt.Errorf("error from fs.Stat(%s): %w", path, err)
195+
}
196+
197+
if err := f.Close(); err != nil {
198+
return info, fmt.Errorf("err from fs.Close(%s): %w", path, err)
199+
}
200+
201+
return info, nil
202+
}

0 commit comments

Comments
 (0)