Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for workunitbase and wrap fsnotify library calls #866

Merged
merged 3 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (cw *commandUnit) runCommand(cmd *exec.Cmd) error {
})
}()
go cmdWaiter(cmd, doneChan)
go cw.monitorLocalStatus()
go cw.MonitorLocalStatus()

return nil
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func (cw *commandUnit) Restart() error {
// Job never started - mark it failed
cw.UpdateBasicStatus(WorkStateFailed, "Pending at restart", stdoutSize(cw.UnitDir()))
}
go cw.monitorLocalStatus()
go cw.MonitorLocalStatus()

return nil
}
Expand Down Expand Up @@ -331,7 +331,7 @@ func (cfg CommandWorkerCfg) NewWorker(w *Workceptor, unitID string, workType str
baseParams: cfg.Params,
allowRuntimeParams: cfg.AllowRuntimeParams,
}
cw.BaseWorkUnit.Init(w, unitID, workType)
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/workceptor/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newCommandWorker(w *Workceptor, unitID string, workType string) WorkUnit {
baseParams: "foo",
allowRuntimeParams: true,
}
cw.BaseWorkUnit.Init(w, unitID, workType)
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ func (kw *kubeUnit) startOrRestart() error {
} else {
go kw.runWorkUsingLogger()
}
go kw.monitorLocalStatus()
go kw.MonitorLocalStatus()

return nil
}
Expand Down Expand Up @@ -1388,7 +1388,7 @@ func (cfg KubeWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string
deletePodOnRestart: cfg.DeletePodOnRestart,
namePrefix: fmt.Sprintf("%s-", strings.ToLower(cfg.WorkType)),
}
ku.BaseWorkUnit.Init(w, unitID, workType)
ku.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return ku
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/workceptor/mock_workceptor/stdio_utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 76 additions & 0 deletions pkg/workceptor/mock_workceptor/workunitbase.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/workceptor/python.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (cfg workPythonCfg) NewWorker(w *Workceptor, unitID string, workType string
function: cfg.Function,
config: cfg.Config,
}
cw.BaseWorkUnit.Init(w, unitID, workType)
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/workceptor/remote_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (rw *remoteUnit) Release(force bool) error {

func newRemoteWorker(w *Workceptor, unitID, workType string) WorkUnit {
rw := &remoteUnit{logger: w.nc.GetLogger()}
rw.BaseWorkUnit.Init(w, unitID, workType)
rw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
red := &remoteExtraData{}
red.RemoteParams = make(map[string]string)
rw.status.ExtraData = red
Expand Down
6 changes: 6 additions & 0 deletions pkg/workceptor/stdio_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type FileSystemer interface {
OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
Stat(name string) (os.FileInfo, error)
Open(name string) (*os.File, error)
RemoveAll(path string) error
}

// FileSystem represents the real filesystem.
Expand All @@ -36,6 +37,11 @@ func (FileSystem) Open(name string) (*os.File, error) {
return os.Open(name)
}

// RemoveAll removes path and any children it contains.
func (FileSystem) RemoveAll(path string) error {
return os.RemoveAll(path)
}

// FileWriteCloser wraps io.WriteCloser.
type FileWriteCloser interface {
io.WriteCloser
Expand Down
90 changes: 62 additions & 28 deletions pkg/workceptor/workunitbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,29 @@ const (
WorkStateCanceled = 4
)

// WatcherWrapper is wrapping the fsnofity Watcher struct and exposing the Event chan within.
type WatcherWrapper interface {
AaronH88 marked this conversation as resolved.
Show resolved Hide resolved
Add(name string) error
Close() error
EventChannel() chan fsnotify.Event
}

type RealWatcher struct {
watcher *fsnotify.Watcher
}

func (rw *RealWatcher) Add(name string) error {
return rw.watcher.Add(name)
}

func (rw *RealWatcher) Close() error {
return rw.watcher.Close()
}

func (rw *RealWatcher) EventChannel() chan fsnotify.Event {
return rw.watcher.Events
}

// IsComplete returns true if a given WorkState indicates the job is finished.
func IsComplete(workState int) bool {
return workState == WorkStateSucceeded || workState == WorkStateFailed
Expand Down Expand Up @@ -74,10 +97,12 @@ type BaseWorkUnit struct {
lastUpdateErrorLock *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
fs FileSystemer
watcher WatcherWrapper
}

// Init initializes the basic work unit data, in memory only.
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) {
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper) {
AaronH88 marked this conversation as resolved.
Show resolved Hide resolved
bwu.w = w
bwu.status.State = WorkStatePending
bwu.status.Detail = "Unit Created"
Expand All @@ -90,6 +115,17 @@ func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) {
bwu.statusLock = &sync.RWMutex{}
bwu.lastUpdateErrorLock = &sync.RWMutex{}
bwu.ctx, bwu.cancel = context.WithCancel(w.ctx)
bwu.fs = fs
if watcher != nil {
bwu.watcher = watcher
} else {
watcher, err := fsnotify.NewWatcher()
if err == nil {
bwu.watcher = &RealWatcher{watcher: watcher}
} else {
bwu.watcher = nil
}
}
}

// Error logs message with unitID prepended.
Expand Down Expand Up @@ -340,33 +376,29 @@ func (bwu *BaseWorkUnit) LastUpdateError() error {
return bwu.lastUpdateError
}

// monitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
func (bwu *BaseWorkUnit) monitorLocalStatus() {
// MonitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
func (bwu *BaseWorkUnit) MonitorLocalStatus() {
statusFile := path.Join(bwu.UnitDir(), "status")
watcher, err := fsnotify.NewWatcher()
if err == nil {
err = watcher.Add(statusFile)
var watcherEvents chan fsnotify.Event
watcherEvents = make(chan fsnotify.Event)

if bwu.watcher != nil {
err := bwu.watcher.Add(statusFile)
if err == nil {
defer func() {
_ = watcher.Close()
_ = bwu.watcher.Close()
}()
watcherEvents = bwu.watcher.EventChannel()
} else {
_ = watcher.Close()
watcher = nil
_ = bwu.watcher.Close()
bwu.watcher = nil
}
} else {
watcher = nil
}
fi, err := os.Stat(statusFile)
fi, err := bwu.fs.Stat(statusFile)
if err != nil {
fi = nil
}
var watcherEvents chan fsnotify.Event
if watcher == nil {
watcherEvents = make(chan fsnotify.Event)
} else {
watcherEvents = watcher.Events
}

loop:
for {
select {
Expand All @@ -380,14 +412,12 @@ loop:
}
}
case <-time.After(time.Second):
newFi, err := os.Stat(statusFile)
if err == nil {
if fi == nil || fi.ModTime() != newFi.ModTime() {
fi = newFi
err = bwu.Load()
if err != nil {
bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err)
}
newFi, err := bwu.fs.Stat(statusFile)
if err == nil && (fi == nil || fi.ModTime() != newFi.ModTime()) {
fi = newFi
err = bwu.Load()
if err != nil {
bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err)
}
}
}
Expand Down Expand Up @@ -425,7 +455,7 @@ func (bwu *BaseWorkUnit) Release(force bool) error {
defer bwu.statusLock.Unlock()
attemptsLeft := 3
for {
err := os.RemoveAll(bwu.UnitDir())
err := bwu.fs.RemoveAll(bwu.UnitDir())
if force {
break
} else if err != nil {
Expand All @@ -451,11 +481,15 @@ func (bwu *BaseWorkUnit) Release(force bool) error {
return nil
}

func (bwu *BaseWorkUnit) CancelContext() {
bwu.cancel()
}

// =============================================================================================== //

func newUnknownWorker(w *Workceptor, unitID string, workType string) WorkUnit {
uu := &unknownUnit{}
uu.BaseWorkUnit.Init(w, unitID, workType)
uu.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return uu
}
Expand Down
Loading