Skip to content

Commit 39f1500

Browse files
committed
Add unit tests for workunitbase and wrap fsnotify library calls
1 parent 45d7df5 commit 39f1500

File tree

10 files changed

+524
-24
lines changed

10 files changed

+524
-24
lines changed

pkg/workceptor/command.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func (cw *commandUnit) runCommand(cmd *exec.Cmd) error {
219219
})
220220
}()
221221
go cmdWaiter(cmd, doneChan)
222-
go cw.monitorLocalStatus()
222+
go cw.MonitorLocalStatus()
223223

224224
return nil
225225
}
@@ -263,7 +263,7 @@ func (cw *commandUnit) Restart() error {
263263
// Job never started - mark it failed
264264
cw.UpdateBasicStatus(WorkStateFailed, "Pending at restart", stdoutSize(cw.UnitDir()))
265265
}
266-
go cw.monitorLocalStatus()
266+
go cw.MonitorLocalStatus()
267267

268268
return nil
269269
}
@@ -331,7 +331,7 @@ func (cfg CommandWorkerCfg) NewWorker(w *Workceptor, unitID string, workType str
331331
baseParams: cfg.Params,
332332
allowRuntimeParams: cfg.AllowRuntimeParams,
333333
}
334-
cw.BaseWorkUnit.Init(w, unitID, workType)
334+
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
335335

336336
return cw
337337
}

pkg/workceptor/json_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func newCommandWorker(w *Workceptor, unitID string, workType string) WorkUnit {
2222
baseParams: "foo",
2323
allowRuntimeParams: true,
2424
}
25-
cw.BaseWorkUnit.Init(w, unitID, workType)
25+
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
2626

2727
return cw
2828
}

pkg/workceptor/kubernetes.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,7 +1272,7 @@ func (kw *kubeUnit) startOrRestart() error {
12721272
} else {
12731273
go kw.runWorkUsingLogger()
12741274
}
1275-
go kw.monitorLocalStatus()
1275+
go kw.MonitorLocalStatus()
12761276

12771277
return nil
12781278
}
@@ -1388,7 +1388,7 @@ func (cfg KubeWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string
13881388
deletePodOnRestart: cfg.DeletePodOnRestart,
13891389
namePrefix: fmt.Sprintf("%s-", strings.ToLower(cfg.WorkType)),
13901390
}
1391-
ku.BaseWorkUnit.Init(w, unitID, workType)
1391+
ku.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
13921392

13931393
return ku
13941394
}

pkg/workceptor/mock_workceptor/stdio_utils.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/workceptor/mock_workceptor/workunitbase.go

Lines changed: 76 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/workceptor/python.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (cfg workPythonCfg) NewWorker(w *Workceptor, unitID string, workType string
6363
function: cfg.Function,
6464
config: cfg.Config,
6565
}
66-
cw.BaseWorkUnit.Init(w, unitID, workType)
66+
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
6767

6868
return cw
6969
}

pkg/workceptor/remote_work.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ func (rw *remoteUnit) Release(force bool) error {
680680

681681
func newRemoteWorker(w *Workceptor, unitID, workType string) WorkUnit {
682682
rw := &remoteUnit{logger: w.nc.GetLogger()}
683-
rw.BaseWorkUnit.Init(w, unitID, workType)
683+
rw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
684684
red := &remoteExtraData{}
685685
red.RemoteParams = make(map[string]string)
686686
rw.status.ExtraData = red

pkg/workceptor/stdio_utils.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type FileSystemer interface {
1616
OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
1717
Stat(name string) (os.FileInfo, error)
1818
Open(name string) (*os.File, error)
19+
RemoveAll(path string) error
1920
}
2021

2122
// FileSystem represents the real filesystem.
@@ -36,6 +37,11 @@ func (FileSystem) Open(name string) (*os.File, error) {
3637
return os.Open(name)
3738
}
3839

40+
// RemoveAll removes path and any children it contains.
41+
func (FileSystem) RemoveAll(path string) error {
42+
return os.RemoveAll(path)
43+
}
44+
3945
// FileWriteCloser wraps io.WriteCloser.
4046
type FileWriteCloser interface {
4147
io.WriteCloser

pkg/workceptor/workunitbase.go

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,28 @@ const (
3232
WorkStateCanceled = 4
3333
)
3434

35+
type WatcherWrapper interface {
36+
Add(name string) error
37+
Close() error
38+
EventChannel() chan fsnotify.Event
39+
}
40+
41+
type RealWatcher struct {
42+
watcher *fsnotify.Watcher
43+
}
44+
45+
func (rw *RealWatcher) Add(name string) error {
46+
return rw.watcher.Add(name)
47+
}
48+
49+
func (rw *RealWatcher) Close() error {
50+
return rw.watcher.Close()
51+
}
52+
53+
func (rw *RealWatcher) EventChannel() chan fsnotify.Event {
54+
return rw.watcher.Events
55+
}
56+
3557
// IsComplete returns true if a given WorkState indicates the job is finished.
3658
func IsComplete(workState int) bool {
3759
return workState == WorkStateSucceeded || workState == WorkStateFailed
@@ -74,10 +96,12 @@ type BaseWorkUnit struct {
7496
lastUpdateErrorLock *sync.RWMutex
7597
ctx context.Context
7698
cancel context.CancelFunc
99+
fs FileSystemer
100+
watcher WatcherWrapper
77101
}
78102

79103
// Init initializes the basic work unit data, in memory only.
80-
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) {
104+
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper) {
81105
bwu.w = w
82106
bwu.status.State = WorkStatePending
83107
bwu.status.Detail = "Unit Created"
@@ -90,6 +114,17 @@ func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) {
90114
bwu.statusLock = &sync.RWMutex{}
91115
bwu.lastUpdateErrorLock = &sync.RWMutex{}
92116
bwu.ctx, bwu.cancel = context.WithCancel(w.ctx)
117+
bwu.fs = fs
118+
if watcher != nil {
119+
bwu.watcher = watcher
120+
} else {
121+
watcher, err := fsnotify.NewWatcher()
122+
if err == nil {
123+
bwu.watcher = &RealWatcher{watcher: watcher}
124+
} else {
125+
bwu.watcher = nil
126+
}
127+
}
93128
}
94129

95130
// Error logs message with unitID prepended.
@@ -340,32 +375,31 @@ func (bwu *BaseWorkUnit) LastUpdateError() error {
340375
return bwu.lastUpdateError
341376
}
342377

343-
// monitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
344-
func (bwu *BaseWorkUnit) monitorLocalStatus() {
378+
// MonitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
379+
func (bwu *BaseWorkUnit) MonitorLocalStatus() {
345380
statusFile := path.Join(bwu.UnitDir(), "status")
346-
watcher, err := fsnotify.NewWatcher()
347-
if err == nil {
348-
err = watcher.Add(statusFile)
381+
if bwu.watcher != nil {
382+
err := bwu.watcher.Add(statusFile)
349383
if err == nil {
350384
defer func() {
351-
_ = watcher.Close()
385+
_ = bwu.watcher.Close()
352386
}()
353387
} else {
354-
_ = watcher.Close()
355-
watcher = nil
388+
_ = bwu.watcher.Close()
389+
bwu.watcher = nil
356390
}
357391
} else {
358-
watcher = nil
392+
bwu.watcher = nil
359393
}
360-
fi, err := os.Stat(statusFile)
394+
fi, err := bwu.fs.Stat(statusFile)
361395
if err != nil {
362396
fi = nil
363397
}
364398
var watcherEvents chan fsnotify.Event
365-
if watcher == nil {
399+
if bwu.watcher == nil {
366400
watcherEvents = make(chan fsnotify.Event)
367401
} else {
368-
watcherEvents = watcher.Events
402+
watcherEvents = bwu.watcher.EventChannel()
369403
}
370404
loop:
371405
for {
@@ -380,7 +414,7 @@ loop:
380414
}
381415
}
382416
case <-time.After(time.Second):
383-
newFi, err := os.Stat(statusFile)
417+
newFi, err := bwu.fs.Stat(statusFile)
384418
if err == nil {
385419
if fi == nil || fi.ModTime() != newFi.ModTime() {
386420
fi = newFi
@@ -425,7 +459,7 @@ func (bwu *BaseWorkUnit) Release(force bool) error {
425459
defer bwu.statusLock.Unlock()
426460
attemptsLeft := 3
427461
for {
428-
err := os.RemoveAll(bwu.UnitDir())
462+
err := bwu.fs.RemoveAll(bwu.UnitDir())
429463
if force {
430464
break
431465
} else if err != nil {
@@ -451,11 +485,15 @@ func (bwu *BaseWorkUnit) Release(force bool) error {
451485
return nil
452486
}
453487

488+
func (bwu *BaseWorkUnit) CancelContext() {
489+
bwu.cancel()
490+
}
491+
454492
// =============================================================================================== //
455493

456494
func newUnknownWorker(w *Workceptor, unitID string, workType string) WorkUnit {
457495
uu := &unknownUnit{}
458-
uu.BaseWorkUnit.Init(w, unitID, workType)
496+
uu.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
459497

460498
return uu
461499
}

0 commit comments

Comments
 (0)