Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor work units and added tests for command.go #899

Merged
merged 21 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ build-all:
GOOS=windows go build -o receptor.exe ./cmd/receptor-cl && \
GOOS=darwin go build -o receptor.app ./cmd/receptor-cl && \
go build example/*.go && \
go build -o receptor --tags no_backends,no_services,no_tls_config,no_workceptor,no_cert_auth ./cmd/receptor-cl && \
go build -o receptor --tags no_backends,no_services,no_tls_config ./cmd/receptor-cl && \
go build -o receptor ./cmd/receptor-cl

DIST := receptor_$(shell echo '$(VERSION)' | sed 's/^v//')_$(GOOS)_$(GOARCH)
Expand Down
85 changes: 59 additions & 26 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,60 @@
package workceptor

import (
"context"
"flag"
"fmt"
"os"
"os/exec"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"time"

"github.com/ghjm/cmdline"
"github.com/google/shlex"
)

type BaseWorkUnitForWorkUnit interface {
CancelContext()
ID() string
Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper)
LastUpdateError() error
Load() error
MonitorLocalStatus()
Release(force bool) error
Save() error
SetFromParams(_ map[string]string) error
Status() *StatusFileData
StatusFileName() string
StdoutFileName() string
UnitDir() string
UnredactedStatus() *StatusFileData
UpdateBasicStatus(state int, detail string, stdoutSize int64)
UpdateFullStatus(statusFunc func(*StatusFileData))
GetStatusCopy() StatusFileData
GetStatusWithoutExtraData() *StatusFileData
SetStatusExtraData(interface{})
GetStatusLock() *sync.RWMutex
GetWorkceptor() *Workceptor
SetWorkceptor(*Workceptor)
GetContext() context.Context
GetCancel() context.CancelFunc
}

// commandUnit implements the WorkUnit interface for the Receptor command worker plugin.
type commandUnit struct {
BaseWorkUnit
BaseWorkUnitForWorkUnit
command string
baseParams string
allowRuntimeParams bool
done bool
}

// commandExtraData is the content of the ExtraData JSON field for a command worker.
type commandExtraData struct {
// CommandExtraData is the content of the ExtraData JSON field for a command worker.
type CommandExtraData struct {
Pid int
Params string
}
Expand Down Expand Up @@ -60,7 +89,7 @@
// commandRunner is run in a separate process, to monitor the subprocess and report back metadata.
func commandRunner(command string, params string, unitdir string) error {
status := StatusFileData{}
status.ExtraData = &commandExtraData{}
status.ExtraData = &CommandExtraData{}

Check warning on line 92 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L92

Added line #L92 was not covered by tests
statusFilename := path.Join(unitdir, "status")
err := status.UpdateBasicStatus(statusFilename, WorkStatePending, "Not started yet", 0)
if err != nil {
Expand Down Expand Up @@ -169,7 +198,7 @@
if cmdParams != "" && !cw.allowRuntimeParams {
return fmt.Errorf("extra params provided but not allowed")
}
cw.status.ExtraData.(*commandExtraData).Params = combineParams(cw.baseParams, cmdParams)
cw.GetStatusCopy().ExtraData.(*CommandExtraData).Params = combineParams(cw.baseParams, cmdParams)

return nil
}
Expand All @@ -181,10 +210,10 @@

// UnredactedStatus returns a copy of the status currently loaded in memory, including secrets.
func (cw *commandUnit) UnredactedStatus() *StatusFileData {
cw.statusLock.RLock()
defer cw.statusLock.RUnlock()
status := cw.getStatus()
ed, ok := cw.status.ExtraData.(*commandExtraData)
cw.GetStatusLock().RLock()
defer cw.GetStatusLock().RUnlock()
status := cw.GetStatusWithoutExtraData()
ed, ok := cw.GetStatusCopy().ExtraData.(*CommandExtraData)
if ok {
edCopy := *ed
status.ExtraData = &edCopy
Expand All @@ -206,9 +235,9 @@
}
cw.UpdateFullStatus(func(status *StatusFileData) {
if status.ExtraData == nil {
status.ExtraData = &commandExtraData{}
status.ExtraData = &CommandExtraData{}

Check warning on line 238 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L238

Added line #L238 was not covered by tests
}
status.ExtraData.(*commandExtraData).Pid = cmd.Process.Pid
status.ExtraData.(*CommandExtraData).Pid = cmd.Process.Pid

Check warning on line 240 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L240

Added line #L240 was not covered by tests
})
doneChan := make(chan bool)
go func() {
Expand All @@ -226,8 +255,8 @@

// Start launches a job with given parameters.
func (cw *commandUnit) Start() error {
level := cw.w.nc.GetLogger().GetLogLevel()
levelName, _ := cw.w.nc.GetLogger().LogLevelToName(level)
level := cw.GetWorkceptor().nc.GetLogger().GetLogLevel()
levelName, _ := cw.GetWorkceptor().nc.GetLogger().LogLevelToName(level)
cw.UpdateBasicStatus(WorkStatePending, "Launching command runner", 0)

// TODO: This is another place where we rely on a pre-built binary for testing.
Expand All @@ -243,7 +272,7 @@
"--log-level", levelName,
"--command-runner",
fmt.Sprintf("command=%s", cw.command),
fmt.Sprintf("params=%s", cw.Status().ExtraData.(*commandExtraData).Params),
fmt.Sprintf("params=%s", cw.Status().ExtraData.(*CommandExtraData).Params),
fmt.Sprintf("unitdir=%s", cw.UnitDir()))

return cw.runCommand(cmd)
Expand All @@ -270,9 +299,9 @@

// Cancel stops a running job.
func (cw *commandUnit) Cancel() error {
cw.cancel()
cw.CancelContext()
status := cw.Status()
ced, ok := status.ExtraData.(*commandExtraData)
ced, ok := status.ExtraData.(*CommandExtraData)
if !ok || ced.Pid <= 0 {
return nil
}
Expand Down Expand Up @@ -304,7 +333,7 @@
return err
}

return cw.BaseWorkUnit.Release(force)
return cw.BaseWorkUnitForWorkUnit.Release(force)
}

// **************************************************************************
Expand All @@ -320,18 +349,22 @@
VerifySignature bool `description:"Verify a signed work submission" default:"false"`
}

func (cfg CommandWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string) WorkUnit {
cw := &commandUnit{
BaseWorkUnit: BaseWorkUnit{
func (cfg CommandWorkerCfg) NewWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit {
if bwu == nil {
bwu = &BaseWorkUnit{

Check warning on line 354 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L354

Added line #L354 was not covered by tests
status: StatusFileData{
ExtraData: &commandExtraData{},
ExtraData: &CommandExtraData{},

Check warning on line 356 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L356

Added line #L356 was not covered by tests
},
},
command: cfg.Command,
baseParams: cfg.Params,
allowRuntimeParams: cfg.AllowRuntimeParams,
}
}

Check warning on line 359 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L358-L359

Added lines #L358 - L359 were not covered by tests

cw := &commandUnit{
BaseWorkUnitForWorkUnit: bwu,
command: cfg.Command,
baseParams: cfg.Params,
allowRuntimeParams: cfg.AllowRuntimeParams,
}
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
cw.BaseWorkUnitForWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
Loading
Loading