Skip to content

Refactor work units and added tests for command.go #899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ build-all:
GOOS=windows go build -o receptor.exe ./cmd/receptor-cl && \
GOOS=darwin go build -o receptor.app ./cmd/receptor-cl && \
go build example/*.go && \
go build -o receptor --tags no_backends,no_services,no_tls_config,no_workceptor,no_cert_auth ./cmd/receptor-cl && \
go build -o receptor --tags no_backends,no_services,no_tls_config ./cmd/receptor-cl && \
go build -o receptor ./cmd/receptor-cl

DIST := receptor_$(shell echo '$(VERSION)' | sed 's/^v//')_$(GOOS)_$(GOARCH)
Expand Down
85 changes: 59 additions & 26 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,60 @@
package workceptor

import (
"context"
"flag"
"fmt"
"os"
"os/exec"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"time"

"github.com/ghjm/cmdline"
"github.com/google/shlex"
)

type BaseWorkUnitForWorkUnit interface {
CancelContext()
ID() string
Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper)
LastUpdateError() error
Load() error
MonitorLocalStatus()
Release(force bool) error
Save() error
SetFromParams(_ map[string]string) error
Status() *StatusFileData
StatusFileName() string
StdoutFileName() string
UnitDir() string
UnredactedStatus() *StatusFileData
UpdateBasicStatus(state int, detail string, stdoutSize int64)
UpdateFullStatus(statusFunc func(*StatusFileData))
GetStatusCopy() StatusFileData
GetStatusWithoutExtraData() *StatusFileData
SetStatusExtraData(interface{})
GetStatusLock() *sync.RWMutex
GetWorkceptor() *Workceptor
SetWorkceptor(*Workceptor)
GetContext() context.Context
GetCancel() context.CancelFunc
}

// commandUnit implements the WorkUnit interface for the Receptor command worker plugin.
type commandUnit struct {
BaseWorkUnit
BaseWorkUnitForWorkUnit
command string
baseParams string
allowRuntimeParams bool
done bool
}

// commandExtraData is the content of the ExtraData JSON field for a command worker.
type commandExtraData struct {
// CommandExtraData is the content of the ExtraData JSON field for a command worker.
type CommandExtraData struct {
Pid int
Params string
}
Expand Down Expand Up @@ -60,7 +89,7 @@ func cmdWaiter(cmd *exec.Cmd, doneChan chan bool) {
// commandRunner is run in a separate process, to monitor the subprocess and report back metadata.
func commandRunner(command string, params string, unitdir string) error {
status := StatusFileData{}
status.ExtraData = &commandExtraData{}
status.ExtraData = &CommandExtraData{}
statusFilename := path.Join(unitdir, "status")
err := status.UpdateBasicStatus(statusFilename, WorkStatePending, "Not started yet", 0)
if err != nil {
Expand Down Expand Up @@ -169,7 +198,7 @@ func (cw *commandUnit) SetFromParams(params map[string]string) error {
if cmdParams != "" && !cw.allowRuntimeParams {
return fmt.Errorf("extra params provided but not allowed")
}
cw.status.ExtraData.(*commandExtraData).Params = combineParams(cw.baseParams, cmdParams)
cw.GetStatusCopy().ExtraData.(*CommandExtraData).Params = combineParams(cw.baseParams, cmdParams)

return nil
}
Expand All @@ -181,10 +210,10 @@ func (cw *commandUnit) Status() *StatusFileData {

// UnredactedStatus returns a copy of the status currently loaded in memory, including secrets.
func (cw *commandUnit) UnredactedStatus() *StatusFileData {
cw.statusLock.RLock()
defer cw.statusLock.RUnlock()
status := cw.getStatus()
ed, ok := cw.status.ExtraData.(*commandExtraData)
cw.GetStatusLock().RLock()
defer cw.GetStatusLock().RUnlock()
status := cw.GetStatusWithoutExtraData()
ed, ok := cw.GetStatusCopy().ExtraData.(*CommandExtraData)
if ok {
edCopy := *ed
status.ExtraData = &edCopy
Expand All @@ -206,9 +235,9 @@ func (cw *commandUnit) runCommand(cmd *exec.Cmd) error {
}
cw.UpdateFullStatus(func(status *StatusFileData) {
if status.ExtraData == nil {
status.ExtraData = &commandExtraData{}
status.ExtraData = &CommandExtraData{}
}
status.ExtraData.(*commandExtraData).Pid = cmd.Process.Pid
status.ExtraData.(*CommandExtraData).Pid = cmd.Process.Pid
})
doneChan := make(chan bool)
go func() {
Expand All @@ -226,8 +255,8 @@ func (cw *commandUnit) runCommand(cmd *exec.Cmd) error {

// Start launches a job with given parameters.
func (cw *commandUnit) Start() error {
level := cw.w.nc.GetLogger().GetLogLevel()
levelName, _ := cw.w.nc.GetLogger().LogLevelToName(level)
level := cw.GetWorkceptor().nc.GetLogger().GetLogLevel()
levelName, _ := cw.GetWorkceptor().nc.GetLogger().LogLevelToName(level)
cw.UpdateBasicStatus(WorkStatePending, "Launching command runner", 0)

// TODO: This is another place where we rely on a pre-built binary for testing.
Expand All @@ -243,7 +272,7 @@ func (cw *commandUnit) Start() error {
"--log-level", levelName,
"--command-runner",
fmt.Sprintf("command=%s", cw.command),
fmt.Sprintf("params=%s", cw.Status().ExtraData.(*commandExtraData).Params),
fmt.Sprintf("params=%s", cw.Status().ExtraData.(*CommandExtraData).Params),
fmt.Sprintf("unitdir=%s", cw.UnitDir()))

return cw.runCommand(cmd)
Expand All @@ -270,9 +299,9 @@ func (cw *commandUnit) Restart() error {

// Cancel stops a running job.
func (cw *commandUnit) Cancel() error {
cw.cancel()
cw.CancelContext()
status := cw.Status()
ced, ok := status.ExtraData.(*commandExtraData)
ced, ok := status.ExtraData.(*CommandExtraData)
if !ok || ced.Pid <= 0 {
return nil
}
Expand Down Expand Up @@ -304,7 +333,7 @@ func (cw *commandUnit) Release(force bool) error {
return err
}

return cw.BaseWorkUnit.Release(force)
return cw.BaseWorkUnitForWorkUnit.Release(force)
}

// **************************************************************************
Expand All @@ -320,18 +349,22 @@ type CommandWorkerCfg struct {
VerifySignature bool `description:"Verify a signed work submission" default:"false"`
}

func (cfg CommandWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string) WorkUnit {
cw := &commandUnit{
BaseWorkUnit: BaseWorkUnit{
func (cfg CommandWorkerCfg) NewWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit {
if bwu == nil {
bwu = &BaseWorkUnit{
status: StatusFileData{
ExtraData: &commandExtraData{},
ExtraData: &CommandExtraData{},
},
},
command: cfg.Command,
baseParams: cfg.Params,
allowRuntimeParams: cfg.AllowRuntimeParams,
}
}

cw := &commandUnit{
BaseWorkUnitForWorkUnit: bwu,
command: cfg.Command,
baseParams: cfg.Params,
allowRuntimeParams: cfg.AllowRuntimeParams,
}
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
cw.BaseWorkUnitForWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
Loading