Skip to content

Commit

Permalink
Clean up jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed Apr 30, 2024
1 parent 448d2b1 commit e792571
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 26 deletions.
2 changes: 1 addition & 1 deletion hack/ccp/internal/controller/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (i *iterator) runJob(ctx context.Context, id uuid.UUID) (uuid.UUID, error)

// buildJob configures a workflow job given the workflow chain link definition.
func (i *iterator) buildJob(wl *workflow.Link, logger logr.Logger) (*job, error) {
j, err := newJob(logger, i.chain, i.p, i.gearman, wl)
j, err := newJob(logger, i.chain, i.p, i.gearman, wl, i.wf)
if err != nil {
return nil, fmt.Errorf("build job: %v", err)
}
Expand Down
151 changes: 127 additions & 24 deletions hack/ccp/internal/controller/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,46 @@ import (
)

type job struct {
id uuid.UUID
logger logr.Logger

// gearman is used to dispatch jobs to MCPClient.
gearman *gearmin.Server

// id of the job.
id uuid.UUID

// createdAt is populated when the job is first created.
createdAt time.Time
logger logr.Logger
chain *chain
pkg *Package
wl *workflow.Link
gearman *gearmin.Server

// chain carries state across multiple jobs within a workflow chain.
chain *chain

// pkg is the package related to the job execution.
pkg *Package

// wl is the base configuration of each job.
wl *workflow.Link

// wf is used to validate preconfigured choices.
wf *workflow.Document

// jobRunner is what makes a job executable.
jobRunner
}

type jobRunner interface {
exec(context.Context) (uuid.UUID, error)
}

func newJob(logger logr.Logger, chain *chain, pkg *Package, gearman *gearmin.Server, wl *workflow.Link) (*job, error) {
func newJob(logger logr.Logger, chain *chain, pkg *Package, gearman *gearmin.Server, wl *workflow.Link, wf *workflow.Document) (*job, error) {
j := &job{
gearman: gearman,
id: uuid.New(),
createdAt: time.Now().UTC(),
chain: chain,
pkg: pkg,
wl: wl,
gearman: gearman,
wf: wf,
}

var err error
Expand Down Expand Up @@ -102,6 +120,7 @@ func (j *job) save(ctx context.Context) error {
})
}

// markAwaitingDecision is used by decision jobs to persist the awaiting status.
func (j *job) markAwaitingDecision(ctx context.Context) error {
err := j.pkg.store.UpdateJobStatus(ctx, j.id, "STATUS_AWAITING_DECISION")
if err != nil {
Expand All @@ -111,6 +130,7 @@ func (j *job) markAwaitingDecision(ctx context.Context) error {
return nil
}

// markComplete is used by decision jobs to persist the completion status.
func (j *job) markComplete(ctx context.Context) error {
err := j.pkg.store.UpdateJobStatus(ctx, j.id, "STATUS_COMPLETED_SUCCESSFULLY")
if err != nil {
Expand Down Expand Up @@ -138,6 +158,9 @@ func (j *job) updateStatusFromExitCode(ctx context.Context, code int) error {

// outputDecisionJob.
//
// A job that handles a workflow decision point, with choices based on script
// output.
//
// Manager: linkTaskManagerGetUserChoiceFromMicroserviceGeneratedList.
// Class: OutputDecisionJob(DecisionJob).
type outputDecisionJob struct {
Expand Down Expand Up @@ -167,12 +190,16 @@ func (l *outputDecisionJob) exec(ctx context.Context) (uuid.UUID, error) {
return uuid.Nil, fmt.Errorf("save: %v", err)
}

// TODO: store active agent with l.j.p.saveValue.
panic("not implemented")

return uuid.Nil, nil
}

// nextChainDecisionJob.
//
// A type of workflow decision that determines the next chain to be executed,
// by UUID.
//
// Manager: linkTaskManagerChoice.
// Class: NextChainDecisionJob(DecisionJob).
type nextChainDecisionJob struct {
Expand All @@ -194,18 +221,42 @@ func newNextChainDecisionJob(j *job) (*nextChainDecisionJob, error) {
}, nil
}

func (l *nextChainDecisionJob) exec(ctx context.Context) (uuid.UUID, error) {
func (l *nextChainDecisionJob) exec(ctx context.Context) (_ uuid.UUID, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("nextChainDecisionJob: %v", err)
return
}
if e := l.j.markComplete(ctx); e != nil {
err = e
}
}()

if err := l.j.pkg.reload(ctx); err != nil {
return uuid.Nil, fmt.Errorf("reload: %v", err)
}
if err := l.j.save(ctx); err != nil {
return uuid.Nil, fmt.Errorf("save: %v", err)
}

// When we have a preconfigured choice.
if chainID, err := l.j.pkg.PreconfiguredChoice(l.j.wl.ID); err != nil {
// Use a preconfigured choice if it validates.
chainID, err := l.j.pkg.PreconfiguredChoice(l.j.wl.ID)
if err != nil {
return uuid.Nil, err
} else if chainID != uuid.Nil {
// Fail if the choice is not available in workflow.
var matched bool
for _, cid := range l.config.Choices {
if _, ok := l.j.wf.Chains[cid]; ok {
matched = true
}
}
if !matched {
return uuid.Nil, fmt.Errorf("choice %s is not one of the available choices", chainID)
}
if err := l.j.markComplete(ctx); err != nil {
return uuid.Nil, err
}
return chainID, nil
}

Expand All @@ -214,18 +265,39 @@ func (l *nextChainDecisionJob) exec(ctx context.Context) (uuid.UUID, error) {
for i, item := range l.config.Choices {
opts[i] = option(item.String())
}
if decision, err := l.j.pkg.AwaitDecision(ctx, opts); err != nil {
return uuid.Nil, fmt.Errorf("await decision: %v", err)
} else {
return decision.uuid(), nil

return l.await(ctx, opts)
}

func (l *nextChainDecisionJob) await(ctx context.Context, opts []option) (_ uuid.UUID, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("await: %v", err)
return
}
}()

if err := l.j.markAwaitingDecision(ctx); err != nil {
return uuid.Nil, err
}

decision, err := l.j.pkg.AwaitDecision(ctx, opts)
if err != nil {
return uuid.Nil, err
}

// TODO: store active agent with l.j.p.saveValue.
return decision.uuid(), nil
}

// updateContextDecisionJob is a job that updates the chain context based on a user choice.
// updateContextDecisionJob.
//
// A job that updates the job chain context based on a user choice.
//
// Manager: linkTaskManagerReplacementDicFromChoice (14 matches).
// TODO: This type of job is mostly copied from the previous
// linkTaskManagerReplacementDicFromChoice, and it seems to have multiple ways
// of executing. It could use some cleanup.
//
// Manager: linkTaskManagerReplacementDicFromChoice.
// Class: UpdateContextDecisionJob(DecisionJob) (decisions.py).
type updateContextDecisionJob struct {
j *job
Expand All @@ -234,6 +306,16 @@ type updateContextDecisionJob struct {

var _ jobRunner = (*updateContextDecisionJob)(nil)

// Maps decision point UUIDs and decision UUIDs to their "canonical"
// equivalents. This is useful for when there are multiple decision points which
// are effectively identical and a preconfigured decision for one should hold
// for all of the others as well. For example, there are 5 "Assign UUIDs to
// directories?" decision points and making a processing config decision for the
// designated canonical one, in this case
// 'bd899573-694e-4d33-8c9b-df0af802437d', should result in that decision taking
// effect for all of the others as well. This allows that.
// TODO: this should be defined in the workflow, not hardcoded here.
//
// nolint: unused
var updateContextDecisionJobChoiceMapping = map[string]string{
// Decision point "Assign UUIDs to directories?".
Expand Down Expand Up @@ -265,21 +347,36 @@ func newUpdateContextDecisionJob(j *job) (*updateContextDecisionJob, error) {
}, nil
}

func (l *updateContextDecisionJob) exec(ctx context.Context) (uuid.UUID, error) {
func (l *updateContextDecisionJob) exec(ctx context.Context) (_ uuid.UUID, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("nextChainDecisionJob: %v", err)
return
}
if e := l.j.markComplete(ctx); e != nil {
err = e
}
}()

if err := l.j.pkg.reload(ctx); err != nil {
return uuid.Nil, fmt.Errorf("reload: %v", err)
}
if err := l.j.save(ctx); err != nil {
return uuid.Nil, fmt.Errorf("save: %v", err)
}

panic("not implemented")

// TODO:
// - Load settings from DashboardSetting to update context.
// - Or load preconfigured choice to update context.
// - Or mark as awaiting.

id := l.j.wl.ExitCodes[0].LinkID
if id == nil || *id == uuid.Nil {
return uuid.Nil, errors.New("ops")
}

// TODO: store active agent with l.j.p.saveValue.

return *id, nil
}

Expand Down Expand Up @@ -406,11 +503,15 @@ func (l *filesClientScriptJob) exec(ctx context.Context) (uuid.UUID, error) {
return uuid.Nil, err
}

if ec, ok := l.j.wl.ExitCodes[exitCode]; ok && ec.LinkID != nil {
if ec, ok := l.j.wl.ExitCodes[exitCode]; ok {
if ec.LinkID == nil {
return uuid.Nil, io.EOF // End of chain.
}
return *ec.LinkID, nil
}

if l.j.wl.FallbackLinkID == uuid.Nil {
return uuid.Nil, io.EOF
return uuid.Nil, io.EOF // End of chain.
}

return l.j.wl.FallbackLinkID, nil
Expand Down Expand Up @@ -503,6 +604,8 @@ func (l *outputClientScriptJob) exec(ctx context.Context) (uuid.UUID, error) {
// }
// Then update generated_choices: self.job_chain.generated_choices = choices.

panic("not implemented")

return uuid.Nil, nil
}

Expand Down
3 changes: 2 additions & 1 deletion hack/ccp/internal/workflow/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"os"
"testing"

"github.com/artefactual/archivematica/hack/ccp/internal/workflow"
"gotest.tools/v3/assert"

"github.com/artefactual/archivematica/hack/ccp/internal/workflow"
)

func TestParseConfig(t *testing.T) {
Expand Down

0 comments on commit e792571

Please sign in to comment.