From 04926e5e01bab1b4ca360d8f16dfe33a3ef4e1a2 Mon Sep 17 00:00:00 2001 From: Aaron Hetherington Date: Tue, 3 Oct 2023 17:56:32 +0100 Subject: [PATCH 1/3] Add unit tests for workunitbase and wrap fsnotify library calls --- pkg/workceptor/command.go | 6 +- pkg/workceptor/json_test.go | 2 +- pkg/workceptor/kubernetes.go | 4 +- pkg/workceptor/mock_workceptor/stdio_utils.go | 14 + .../mock_workceptor/workunitbase.go | 76 ++++ pkg/workceptor/python.go | 2 +- pkg/workceptor/remote_work.go | 2 +- pkg/workceptor/stdio_utils.go | 6 + pkg/workceptor/workunitbase.go | 70 +++- pkg/workceptor/workunitbase_test.go | 366 ++++++++++++++++++ 10 files changed, 524 insertions(+), 24 deletions(-) create mode 100644 pkg/workceptor/mock_workceptor/workunitbase.go create mode 100644 pkg/workceptor/workunitbase_test.go diff --git a/pkg/workceptor/command.go b/pkg/workceptor/command.go index b2c709195..664a87004 100644 --- a/pkg/workceptor/command.go +++ b/pkg/workceptor/command.go @@ -219,7 +219,7 @@ func (cw *commandUnit) runCommand(cmd *exec.Cmd) error { }) }() go cmdWaiter(cmd, doneChan) - go cw.monitorLocalStatus() + go cw.MonitorLocalStatus() return nil } @@ -263,7 +263,7 @@ func (cw *commandUnit) Restart() error { // Job never started - mark it failed cw.UpdateBasicStatus(WorkStateFailed, "Pending at restart", stdoutSize(cw.UnitDir())) } - go cw.monitorLocalStatus() + go cw.MonitorLocalStatus() return nil } @@ -331,7 +331,7 @@ func (cfg CommandWorkerCfg) NewWorker(w *Workceptor, unitID string, workType str baseParams: cfg.Params, allowRuntimeParams: cfg.AllowRuntimeParams, } - cw.BaseWorkUnit.Init(w, unitID, workType) + cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil) return cw } diff --git a/pkg/workceptor/json_test.go b/pkg/workceptor/json_test.go index ac5d34303..de4d1f4da 100644 --- a/pkg/workceptor/json_test.go +++ b/pkg/workceptor/json_test.go @@ -22,7 +22,7 @@ func newCommandWorker(w *Workceptor, unitID string, workType string) WorkUnit { baseParams: "foo", allowRuntimeParams: true, } - cw.BaseWorkUnit.Init(w, unitID, workType) + cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil) return cw } diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index ba50e1d1d..14d277b0d 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -1272,7 +1272,7 @@ func (kw *kubeUnit) startOrRestart() error { } else { go kw.runWorkUsingLogger() } - go kw.monitorLocalStatus() + go kw.MonitorLocalStatus() return nil } @@ -1388,7 +1388,7 @@ func (cfg KubeWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string deletePodOnRestart: cfg.DeletePodOnRestart, namePrefix: fmt.Sprintf("%s-", strings.ToLower(cfg.WorkType)), } - ku.BaseWorkUnit.Init(w, unitID, workType) + ku.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil) return ku } diff --git a/pkg/workceptor/mock_workceptor/stdio_utils.go b/pkg/workceptor/mock_workceptor/stdio_utils.go index 36a24e987..7634027fb 100644 --- a/pkg/workceptor/mock_workceptor/stdio_utils.go +++ b/pkg/workceptor/mock_workceptor/stdio_utils.go @@ -78,6 +78,20 @@ func (mr *MockFileSystemerMockRecorder) Open(name interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockFileSystemer)(nil).Open), name) } +// RemoveAll mocks base method +func (m *MockFileSystemer) RemoveAll(path string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveAll", path) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveAll indicates an expected call of RemoveAll +func (mr *MockFileSystemerMockRecorder) RemoveAll(path interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveAll", reflect.TypeOf((*MockFileSystemer)(nil).RemoveAll), path) +} + // MockFileWriteCloser is a mock of FileWriteCloser interface type MockFileWriteCloser struct { ctrl *gomock.Controller diff --git a/pkg/workceptor/mock_workceptor/workunitbase.go b/pkg/workceptor/mock_workceptor/workunitbase.go new file mode 100644 index 000000000..21232d401 --- /dev/null +++ b/pkg/workceptor/mock_workceptor/workunitbase.go @@ -0,0 +1,76 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/workceptor/workunitbase.go + +// Package mock_workceptor is a generated GoMock package. +package mock_workceptor + +import ( + fsnotify "github.com/fsnotify/fsnotify" + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockWatcherWrapper is a mock of WatcherWrapper interface +type MockWatcherWrapper struct { + ctrl *gomock.Controller + recorder *MockWatcherWrapperMockRecorder +} + +// MockWatcherWrapperMockRecorder is the mock recorder for MockWatcherWrapper +type MockWatcherWrapperMockRecorder struct { + mock *MockWatcherWrapper +} + +// NewMockWatcherWrapper creates a new mock instance +func NewMockWatcherWrapper(ctrl *gomock.Controller) *MockWatcherWrapper { + mock := &MockWatcherWrapper{ctrl: ctrl} + mock.recorder = &MockWatcherWrapperMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockWatcherWrapper) EXPECT() *MockWatcherWrapperMockRecorder { + return m.recorder +} + +// Add mocks base method +func (m *MockWatcherWrapper) Add(name string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Add", name) + ret0, _ := ret[0].(error) + return ret0 +} + +// Add indicates an expected call of Add +func (mr *MockWatcherWrapperMockRecorder) Add(name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockWatcherWrapper)(nil).Add), name) +} + +// Close mocks base method +func (m *MockWatcherWrapper) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockWatcherWrapperMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockWatcherWrapper)(nil).Close)) +} + +// EventChannel mocks base method +func (m *MockWatcherWrapper) EventChannel() chan fsnotify.Event { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EventChannel") + ret0, _ := ret[0].(chan fsnotify.Event) + return ret0 +} + +// EventChannel indicates an expected call of EventChannel +func (mr *MockWatcherWrapperMockRecorder) EventChannel() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventChannel", reflect.TypeOf((*MockWatcherWrapper)(nil).EventChannel)) +} diff --git a/pkg/workceptor/python.go b/pkg/workceptor/python.go index 2aa2ffeef..98a363fdf 100644 --- a/pkg/workceptor/python.go +++ b/pkg/workceptor/python.go @@ -63,7 +63,7 @@ func (cfg workPythonCfg) NewWorker(w *Workceptor, unitID string, workType string function: cfg.Function, config: cfg.Config, } - cw.BaseWorkUnit.Init(w, unitID, workType) + cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil) return cw } diff --git a/pkg/workceptor/remote_work.go b/pkg/workceptor/remote_work.go index 89dd5158f..69acfd3a1 100644 --- a/pkg/workceptor/remote_work.go +++ b/pkg/workceptor/remote_work.go @@ -680,7 +680,7 @@ func (rw *remoteUnit) Release(force bool) error { func newRemoteWorker(w *Workceptor, unitID, workType string) WorkUnit { rw := &remoteUnit{logger: w.nc.GetLogger()} - rw.BaseWorkUnit.Init(w, unitID, workType) + rw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil) red := &remoteExtraData{} red.RemoteParams = make(map[string]string) rw.status.ExtraData = red diff --git a/pkg/workceptor/stdio_utils.go b/pkg/workceptor/stdio_utils.go index 0f2b5ee46..99e50a0b1 100644 --- a/pkg/workceptor/stdio_utils.go +++ b/pkg/workceptor/stdio_utils.go @@ -16,6 +16,7 @@ type FileSystemer interface { OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) Stat(name string) (os.FileInfo, error) Open(name string) (*os.File, error) + RemoveAll(path string) error } // FileSystem represents the real filesystem. @@ -36,6 +37,11 @@ func (FileSystem) Open(name string) (*os.File, error) { return os.Open(name) } +// RemoveAll removes path and any children it contains. +func (FileSystem) RemoveAll(path string) error { + return os.RemoveAll(path) +} + // FileWriteCloser wraps io.WriteCloser. type FileWriteCloser interface { io.WriteCloser diff --git a/pkg/workceptor/workunitbase.go b/pkg/workceptor/workunitbase.go index 62cb2ce59..ee5088d89 100644 --- a/pkg/workceptor/workunitbase.go +++ b/pkg/workceptor/workunitbase.go @@ -32,6 +32,28 @@ const ( WorkStateCanceled = 4 ) +type WatcherWrapper interface { + Add(name string) error + Close() error + EventChannel() chan fsnotify.Event +} + +type RealWatcher struct { + watcher *fsnotify.Watcher +} + +func (rw *RealWatcher) Add(name string) error { + return rw.watcher.Add(name) +} + +func (rw *RealWatcher) Close() error { + return rw.watcher.Close() +} + +func (rw *RealWatcher) EventChannel() chan fsnotify.Event { + return rw.watcher.Events +} + // IsComplete returns true if a given WorkState indicates the job is finished. func IsComplete(workState int) bool { return workState == WorkStateSucceeded || workState == WorkStateFailed @@ -74,10 +96,12 @@ type BaseWorkUnit struct { lastUpdateErrorLock *sync.RWMutex ctx context.Context cancel context.CancelFunc + fs FileSystemer + watcher WatcherWrapper } // Init initializes the basic work unit data, in memory only. -func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) { +func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper) { bwu.w = w bwu.status.State = WorkStatePending bwu.status.Detail = "Unit Created" @@ -90,6 +114,17 @@ func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) { bwu.statusLock = &sync.RWMutex{} bwu.lastUpdateErrorLock = &sync.RWMutex{} bwu.ctx, bwu.cancel = context.WithCancel(w.ctx) + bwu.fs = fs + if watcher != nil { + bwu.watcher = watcher + } else { + watcher, err := fsnotify.NewWatcher() + if err == nil { + bwu.watcher = &RealWatcher{watcher: watcher} + } else { + bwu.watcher = nil + } + } } // Error logs message with unitID prepended. @@ -340,32 +375,31 @@ func (bwu *BaseWorkUnit) LastUpdateError() error { return bwu.lastUpdateError } -// monitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes. -func (bwu *BaseWorkUnit) monitorLocalStatus() { +// MonitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes. +func (bwu *BaseWorkUnit) MonitorLocalStatus() { statusFile := path.Join(bwu.UnitDir(), "status") - watcher, err := fsnotify.NewWatcher() - if err == nil { - err = watcher.Add(statusFile) + if bwu.watcher != nil { + err := bwu.watcher.Add(statusFile) if err == nil { defer func() { - _ = watcher.Close() + _ = bwu.watcher.Close() }() } else { - _ = watcher.Close() - watcher = nil + _ = bwu.watcher.Close() + bwu.watcher = nil } } else { - watcher = nil + bwu.watcher = nil } - fi, err := os.Stat(statusFile) + fi, err := bwu.fs.Stat(statusFile) if err != nil { fi = nil } var watcherEvents chan fsnotify.Event - if watcher == nil { + if bwu.watcher == nil { watcherEvents = make(chan fsnotify.Event) } else { - watcherEvents = watcher.Events + watcherEvents = bwu.watcher.EventChannel() } loop: for { @@ -380,7 +414,7 @@ loop: } } case <-time.After(time.Second): - newFi, err := os.Stat(statusFile) + newFi, err := bwu.fs.Stat(statusFile) if err == nil { if fi == nil || fi.ModTime() != newFi.ModTime() { fi = newFi @@ -425,7 +459,7 @@ func (bwu *BaseWorkUnit) Release(force bool) error { defer bwu.statusLock.Unlock() attemptsLeft := 3 for { - err := os.RemoveAll(bwu.UnitDir()) + err := bwu.fs.RemoveAll(bwu.UnitDir()) if force { break } else if err != nil { @@ -451,11 +485,15 @@ func (bwu *BaseWorkUnit) Release(force bool) error { return nil } +func (bwu *BaseWorkUnit) CancelContext() { + bwu.cancel() +} + // =============================================================================================== // func newUnknownWorker(w *Workceptor, unitID string, workType string) WorkUnit { uu := &unknownUnit{} - uu.BaseWorkUnit.Init(w, unitID, workType) + uu.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil) return uu } diff --git a/pkg/workceptor/workunitbase_test.go b/pkg/workceptor/workunitbase_test.go new file mode 100644 index 000000000..21a1819ef --- /dev/null +++ b/pkg/workceptor/workunitbase_test.go @@ -0,0 +1,366 @@ +package workceptor_test + +import ( + "context" + "errors" + "fmt" + "path" + "strings" + "testing" + "time" + + "github.com/ansible/receptor/pkg/logger" + "github.com/ansible/receptor/pkg/workceptor" + "github.com/ansible/receptor/pkg/workceptor/mock_workceptor" + "github.com/fsnotify/fsnotify" + "github.com/golang/mock/gomock" +) + +func TestIsComplete(t *testing.T) { + testCases := []struct { + name string + workState int + isComplete bool + }{ + {"Pending Work is Incomplete", workceptor.WorkStatePending, false}, + {"Running Work is Incomplete", workceptor.WorkStateRunning, false}, + {"Succeeded Work is Complete", workceptor.WorkStateSucceeded, true}, + {"Failed Work is Complete", workceptor.WorkStateFailed, true}, + {"Unknown Work is Incomplete", 999, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if result := workceptor.IsComplete(tc.workState); result != tc.isComplete { + t.Errorf("expected %v, got %v", tc.isComplete, result) + } + }) + } +} + +func TestWorkStateToString(t *testing.T) { + testCases := []struct { + name string + workState int + description string + }{ + {"Pending Work Description", workceptor.WorkStatePending, "Pending"}, + {"Running Work Description", workceptor.WorkStateRunning, "Running"}, + {"Succeeded Work Description", workceptor.WorkStateSucceeded, "Succeeded"}, + {"Failed Work Description", workceptor.WorkStateFailed, "Failed"}, + {"Unknown Work Description", 999, "Unknown"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if result := workceptor.WorkStateToString(tc.workState); result != tc.description { + t.Errorf("expected %s, got %s", tc.description, result) + } + }) + } +} + +func TestIsPending(t *testing.T) { + testCases := []struct { + name string + err error + isPending bool + }{ + {"Pending Error", workceptor.ErrPending, true}, + {"Non-pending Error", errors.New("test error"), false}, + {"Nil Error", nil, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if result := workceptor.IsPending(tc.err); result != tc.isPending { + t.Errorf("expected %v, got %v", tc.isPending, result) + } + }) + } +} + +func setUp(t *testing.T) (*gomock.Controller, workceptor.BaseWorkUnit, *workceptor.Workceptor, *mock_workceptor.MockNetceptorForWorkceptor) { + ctrl := gomock.NewController(t) + + mockNetceptor := mock_workceptor.NewMockNetceptorForWorkceptor(ctrl) + + // attach logger to the mock netceptor and return any number of times + logger := logger.NewReceptorLogger("") + mockNetceptor.EXPECT().GetLogger().AnyTimes().Return(logger) + mockNetceptor.EXPECT().NodeID().Return("NodeID") + ctx := context.Background() + w, err := workceptor.New(ctx, mockNetceptor, "/tmp") + if err != nil { + t.Errorf("Error while creating Workceptor: %v", err) + } + + bwu := workceptor.BaseWorkUnit{} + + return ctrl, bwu, w, mockNetceptor +} + +func TestInit(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "test", workceptor.FileSystem{}, nil) + ctrl.Finish() +} + +func TestErrorLog(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "test", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + bwu.Error("test error") + ctrl.Finish() +} + +func TestWarningLog(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "test", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + bwu.Warning("test warning") + ctrl.Finish() +} + +func TestInfoLog(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "test", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + bwu.Info("test info") + ctrl.Finish() +} + +func TestDebugLog(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "test", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + bwu.Error("test debug") + ctrl.Finish() +} + +func TestSetFromParams(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "test", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + err := bwu.SetFromParams(nil) + if err != nil { + t.Errorf("SetFromParams should return nil: got %v", err) + } + ctrl.Finish() +} + +func TestUnitDir(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "test", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + expectedUnitDir := path.Join("/tmp", "NodeID/test") + if unitDir := bwu.UnitDir(); unitDir != expectedUnitDir { + t.Errorf("UnitDir returned wrong value: got %s, want %s", unitDir, expectedUnitDir) + } + ctrl.Finish() +} + +func TestID(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "test", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + if id := bwu.ID(); id != "test" { + t.Errorf("ID returned wrong value: got %s, want %s", id, "test") + } + ctrl.Finish() +} + +func TestStatusFileName(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + expectedUnitDir := path.Join("/tmp", "NodeID/test") + expectedStatusFileName := path.Join(expectedUnitDir, "status") + if statusFileName := bwu.StatusFileName(); statusFileName != expectedStatusFileName { + t.Errorf("StatusFileName returned wrong value: got %s, want %s", statusFileName, expectedStatusFileName) + } + ctrl.Finish() +} + +func TestStdoutFileName(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + expectedUnitDir := path.Join("/tmp", "NodeID/test") + expectedStdoutFileName := path.Join(expectedUnitDir, "stdout") + if stdoutFileName := bwu.StdoutFileName(); stdoutFileName != expectedStdoutFileName { + t.Errorf("StdoutFileName returned wrong value: got %s, want %s", stdoutFileName, expectedStdoutFileName) + } + ctrl.Finish() +} + +func TestBaseSave(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + err := bwu.Save() + if !strings.Contains(err.Error(), "no such file or directory") { + t.Errorf("Base Work Unit Save, no such file or directory expected, instead %s", err.Error()) + } + ctrl.Finish() +} + +func TestBaseLoad(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + err := bwu.Load() + if !strings.Contains(err.Error(), "no such file or directory") { + t.Errorf("TestBaseLoad, no such file or directory expected, instead %s", err.Error()) + } + ctrl.Finish() +} + +func TestBaseUpdateFullStatus(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + sf := func(sfd *workceptor.StatusFileData) { return } + bwu.UpdateFullStatus(sf) + err := bwu.LastUpdateError() + if !strings.Contains(err.Error(), "no such file or directory") { + t.Errorf("TestBaseUpdateFullStatus, no such file or directory expected, instead %s", err.Error()) + } + ctrl.Finish() +} + +func TestBaseUpdateBasicStatus(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + bwu.UpdateBasicStatus(1, "Details", 0) + err := bwu.LastUpdateError() + if !strings.Contains(err.Error(), "no such file or directory") { + t.Errorf("TestBaseUpdateBasicStatus, no such file or directory expected, instead %s", err.Error()) + } + ctrl.Finish() +} + +func TestBaseStatus(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) + status := bwu.Status() + if status.State != workceptor.WorkStatePending { + t.Errorf("TestBaseStatus, expected work state pending, recieved %d", status.State) + } + ctrl.Finish() +} + +func TestBaseRelease(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + mockFileSystem := mock_workceptor.NewMockFileSystemer(ctrl) + bwu.Init(w, "test", "", mockFileSystem, &workceptor.RealWatcher{}) + + testCases := []struct { + name string + err error + force bool + calls func() + }{ + { + name: "RemoveAll Error", + err: errors.New("RemoveAll Error"), + force: false, + calls: func() { mockFileSystem.EXPECT().RemoveAll(gomock.Any()).Return(errors.New("RemoveAll Error")).Times(3) }, + }, + { + name: "No remote error without force", + err: nil, + force: false, + calls: func() { mockFileSystem.EXPECT().RemoveAll(gomock.Any()).Return(nil) }, + }, + { + name: "No remote error with force", + err: nil, + force: true, + calls: func() { mockFileSystem.EXPECT().RemoveAll(gomock.Any()).Return(nil) }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.calls() + err := bwu.Release(tc.force) + if err != nil && err.Error() != tc.err.Error() { + t.Errorf("Error returned dosent match, err recieved %s, expected %s", err, tc.err) + } + }) + } + + ctrl.Finish() +} + +func TestMonitorLocalStatus(t *testing.T) { + tests := []struct { + name string + statObj *Info + statObjLater *Info + addWatcherErr error + statErr error + fsNotifyEvent *fsnotify.Event // using pointer to allow nil + sleepDuration time.Duration + }{ + { + name: "Handle Write Event", + statObj: NewInfo("test", 1, 0, time.Now()), + addWatcherErr: nil, + statErr: nil, + fsNotifyEvent: &fsnotify.Event{Op: fsnotify.Write}, + sleepDuration: 100 * time.Millisecond, + }, + { + name: "Error Adding Watcher", + statObj: NewInfo("test", 1, 0, time.Now()), + addWatcherErr: fmt.Errorf("error adding watcher"), + statErr: nil, + fsNotifyEvent: nil, + sleepDuration: 100 * time.Millisecond, + }, + { + name: "Error Reading Status", + statObj: nil, + addWatcherErr: fmt.Errorf("error adding watcher"), + statErr: fmt.Errorf("stat error"), + fsNotifyEvent: nil, + sleepDuration: 100 * time.Millisecond, + }, + { + name: "Handle Context Cancellation", + statObj: NewInfo("test", 1, 0, time.Now()), + addWatcherErr: nil, + statErr: nil, + fsNotifyEvent: &fsnotify.Event{Op: fsnotify.Write}, + sleepDuration: 100 * time.Millisecond, + }, + { + name: "Handle File Update Without Event", + statObj: NewInfo("test", 1, 0, time.Now()), + statObjLater: NewInfo("test", 1, 0, time.Now().Add(10*time.Second)), + addWatcherErr: nil, + statErr: nil, + fsNotifyEvent: &fsnotify.Event{Op: fsnotify.Write}, + sleepDuration: 500 * time.Millisecond, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctrl, bwu, w, _ := setUp(t) + defer ctrl.Finish() + + mockWatcher := mock_workceptor.NewMockWatcherWrapper(ctrl) + mockFileSystem := mock_workceptor.NewMockFileSystemer(ctrl) + bwu.Init(w, "test", "", mockFileSystem, mockWatcher) + + mockFileSystem.EXPECT().Stat(gomock.Any()).Return(tc.statObj, tc.statErr).AnyTimes() + if tc.statObjLater != nil { + mockFileSystem.EXPECT().Stat(gomock.Any()).Return(tc.statObjLater, nil).AnyTimes() + } + mockWatcher.EXPECT().Add(gomock.Any()).Return(tc.addWatcherErr) + mockWatcher.EXPECT().Close().AnyTimes() + + if tc.fsNotifyEvent != nil { + eventCh := make(chan fsnotify.Event, 1) + mockWatcher.EXPECT().EventChannel().Return(eventCh).AnyTimes() + go func() { eventCh <- *tc.fsNotifyEvent }() + } + + go bwu.MonitorLocalStatus() + + time.Sleep(tc.sleepDuration) + bwu.CancelContext() + }) + } +} From 81fb5e064948b9c817a3c5619e91258957cccb78 Mon Sep 17 00:00:00 2001 From: Aaron Hetherington Date: Wed, 4 Oct 2023 11:04:44 +0100 Subject: [PATCH 2/3] Fix code smells and linting errors --- pkg/workceptor/workunitbase.go | 25 +++++++++------------ pkg/workceptor/workunitbase_test.go | 35 ++++++++++++++++++----------- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/pkg/workceptor/workunitbase.go b/pkg/workceptor/workunitbase.go index ee5088d89..4f99fce93 100644 --- a/pkg/workceptor/workunitbase.go +++ b/pkg/workceptor/workunitbase.go @@ -378,29 +378,26 @@ func (bwu *BaseWorkUnit) LastUpdateError() error { // MonitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes. func (bwu *BaseWorkUnit) MonitorLocalStatus() { statusFile := path.Join(bwu.UnitDir(), "status") + var watcherEvents chan fsnotify.Event + watcherEvents = make(chan fsnotify.Event) + if bwu.watcher != nil { err := bwu.watcher.Add(statusFile) if err == nil { defer func() { _ = bwu.watcher.Close() }() + watcherEvents = bwu.watcher.EventChannel() } else { _ = bwu.watcher.Close() bwu.watcher = nil } - } else { - bwu.watcher = nil } fi, err := bwu.fs.Stat(statusFile) if err != nil { fi = nil } - var watcherEvents chan fsnotify.Event - if bwu.watcher == nil { - watcherEvents = make(chan fsnotify.Event) - } else { - watcherEvents = bwu.watcher.EventChannel() - } + loop: for { select { @@ -415,13 +412,11 @@ loop: } case <-time.After(time.Second): newFi, err := bwu.fs.Stat(statusFile) - if err == nil { - if fi == nil || fi.ModTime() != newFi.ModTime() { - fi = newFi - err = bwu.Load() - if err != nil { - bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err) - } + if err == nil && (fi == nil || fi.ModTime() != newFi.ModTime()) { + fi = newFi + err = bwu.Load() + if err != nil { + bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err) } } } diff --git a/pkg/workceptor/workunitbase_test.go b/pkg/workceptor/workunitbase_test.go index 21a1819ef..702ec5723 100644 --- a/pkg/workceptor/workunitbase_test.go +++ b/pkg/workceptor/workunitbase_test.go @@ -144,10 +144,16 @@ func TestSetFromParams(t *testing.T) { ctrl.Finish() } +const ( + rootDir = "/tmp" + testDir = "NodeID/test" + dirError = "no such file or directory" +) + func TestUnitDir(t *testing.T) { ctrl, bwu, w, _ := setUp(t) bwu.Init(w, "test", "test", workceptor.FileSystem{}, &workceptor.RealWatcher{}) - expectedUnitDir := path.Join("/tmp", "NodeID/test") + expectedUnitDir := path.Join(rootDir, testDir) if unitDir := bwu.UnitDir(); unitDir != expectedUnitDir { t.Errorf("UnitDir returned wrong value: got %s, want %s", unitDir, expectedUnitDir) } @@ -166,7 +172,7 @@ func TestID(t *testing.T) { func TestStatusFileName(t *testing.T) { ctrl, bwu, w, _ := setUp(t) bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) - expectedUnitDir := path.Join("/tmp", "NodeID/test") + expectedUnitDir := path.Join(rootDir, testDir) expectedStatusFileName := path.Join(expectedUnitDir, "status") if statusFileName := bwu.StatusFileName(); statusFileName != expectedStatusFileName { t.Errorf("StatusFileName returned wrong value: got %s, want %s", statusFileName, expectedStatusFileName) @@ -177,7 +183,7 @@ func TestStatusFileName(t *testing.T) { func TestStdoutFileName(t *testing.T) { ctrl, bwu, w, _ := setUp(t) bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) - expectedUnitDir := path.Join("/tmp", "NodeID/test") + expectedUnitDir := path.Join(rootDir, testDir) expectedStdoutFileName := path.Join(expectedUnitDir, "stdout") if stdoutFileName := bwu.StdoutFileName(); stdoutFileName != expectedStdoutFileName { t.Errorf("StdoutFileName returned wrong value: got %s, want %s", stdoutFileName, expectedStdoutFileName) @@ -189,7 +195,7 @@ func TestBaseSave(t *testing.T) { ctrl, bwu, w, _ := setUp(t) bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) err := bwu.Save() - if !strings.Contains(err.Error(), "no such file or directory") { + if !strings.Contains(err.Error(), dirError) { t.Errorf("Base Work Unit Save, no such file or directory expected, instead %s", err.Error()) } ctrl.Finish() @@ -199,7 +205,7 @@ func TestBaseLoad(t *testing.T) { ctrl, bwu, w, _ := setUp(t) bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) err := bwu.Load() - if !strings.Contains(err.Error(), "no such file or directory") { + if !strings.Contains(err.Error(), dirError) { t.Errorf("TestBaseLoad, no such file or directory expected, instead %s", err.Error()) } ctrl.Finish() @@ -208,10 +214,12 @@ func TestBaseLoad(t *testing.T) { func TestBaseUpdateFullStatus(t *testing.T) { ctrl, bwu, w, _ := setUp(t) bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) - sf := func(sfd *workceptor.StatusFileData) { return } + sf := func(sfd *workceptor.StatusFileData) { + // Do nothing + } bwu.UpdateFullStatus(sf) err := bwu.LastUpdateError() - if !strings.Contains(err.Error(), "no such file or directory") { + if !strings.Contains(err.Error(), dirError) { t.Errorf("TestBaseUpdateFullStatus, no such file or directory expected, instead %s", err.Error()) } ctrl.Finish() @@ -222,7 +230,7 @@ func TestBaseUpdateBasicStatus(t *testing.T) { bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) bwu.UpdateBasicStatus(1, "Details", 0) err := bwu.LastUpdateError() - if !strings.Contains(err.Error(), "no such file or directory") { + if !strings.Contains(err.Error(), dirError) { t.Errorf("TestBaseUpdateBasicStatus, no such file or directory expected, instead %s", err.Error()) } ctrl.Finish() @@ -233,7 +241,7 @@ func TestBaseStatus(t *testing.T) { bwu.Init(w, "test", "", workceptor.FileSystem{}, &workceptor.RealWatcher{}) status := bwu.Status() if status.State != workceptor.WorkStatePending { - t.Errorf("TestBaseStatus, expected work state pending, recieved %d", status.State) + t.Errorf("TestBaseStatus, expected work state pending, received %d", status.State) } ctrl.Finish() } @@ -243,6 +251,7 @@ func TestBaseRelease(t *testing.T) { mockFileSystem := mock_workceptor.NewMockFileSystemer(ctrl) bwu.Init(w, "test", "", mockFileSystem, &workceptor.RealWatcher{}) + const removeError = "RemoveAll Error" testCases := []struct { name string err error @@ -250,10 +259,10 @@ func TestBaseRelease(t *testing.T) { calls func() }{ { - name: "RemoveAll Error", - err: errors.New("RemoveAll Error"), + name: removeError, + err: errors.New(removeError), force: false, - calls: func() { mockFileSystem.EXPECT().RemoveAll(gomock.Any()).Return(errors.New("RemoveAll Error")).Times(3) }, + calls: func() { mockFileSystem.EXPECT().RemoveAll(gomock.Any()).Return(errors.New(removeError)).Times(3) }, }, { name: "No remote error without force", @@ -274,7 +283,7 @@ func TestBaseRelease(t *testing.T) { tc.calls() err := bwu.Release(tc.force) if err != nil && err.Error() != tc.err.Error() { - t.Errorf("Error returned dosent match, err recieved %s, expected %s", err, tc.err) + t.Errorf("Error returned dosent match, err received %s, expected %s", err, tc.err) } }) } From 0fd7df6ddd1d45f6f555c4e14f05b308d3da30fd Mon Sep 17 00:00:00 2001 From: Aaron Hetherington Date: Mon, 9 Oct 2023 14:58:39 +0100 Subject: [PATCH 3/3] Add comment for new WatcherWrapper interface --- pkg/workceptor/workunitbase.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/workceptor/workunitbase.go b/pkg/workceptor/workunitbase.go index 4f99fce93..291e95978 100644 --- a/pkg/workceptor/workunitbase.go +++ b/pkg/workceptor/workunitbase.go @@ -32,6 +32,7 @@ const ( WorkStateCanceled = 4 ) +// WatcherWrapper is wrapping the fsnofity Watcher struct and exposing the Event chan within. type WatcherWrapper interface { Add(name string) error Close() error