Skip to content

Commit

Permalink
Merge pull request #259 from kthcloud/dev
Browse files Browse the repository at this point in the history
fix bugs with resources not being deleted if runAfter jobs were present
  • Loading branch information
saffronjam authored Nov 9, 2023
2 parents 19a5094 + 065b3ba commit f787bde
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 18 deletions.
21 changes: 10 additions & 11 deletions models/sys/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@ const (
)

type Job struct {
ID string `bson:"id" json:"id"`
UserID string `bson:"userId" json:"userId"`
Type string `bson:"type" json:"type"`
Args map[string]interface{} `bson:"args" json:"args"`
ResourceIDs []string `bson:"resourceIds" json:"resourceIds"`
CreatedAt time.Time `bson:"createdAt" json:"createdAt"`
LastRunAt time.Time `bson:"lastRunAt" json:"lastRunAt"`
FinishedAt time.Time `bson:"finishedAt" json:"finishedAt"`
RunAfter time.Time `bson:"runAfter" json:"runAfter"`
Status string `bson:"status" json:"status"`
ErrorLogs []string `bson:"errorLogs" json:"errorLogs"`
ID string `bson:"id" json:"id"`
UserID string `bson:"userId" json:"userId"`
Type string `bson:"type" json:"type"`
Args map[string]interface{} `bson:"args" json:"args"`
CreatedAt time.Time `bson:"createdAt" json:"createdAt"`
LastRunAt time.Time `bson:"lastRunAt" json:"lastRunAt"`
FinishedAt time.Time `bson:"finishedAt" json:"finishedAt"`
RunAfter time.Time `bson:"runAfter" json:"runAfter"`
Status string `bson:"status" json:"status"`
ErrorLogs []string `bson:"errorLogs" json:"errorLogs"`
}

type UpdateParams struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func DeleteVM(job *jobModel.Job) error {
defer cancel()

go func() {
err = waitForJobs(ctx, relatedJobs, []string{jobModel.StatusCompleted, jobModel.StatusFailed, jobModel.StatusTerminated})
err = waitForJobs(ctx, relatedJobs, []string{jobModel.StatusCompleted, jobModel.StatusTerminated})
if err != nil {
log.Println("failed to wait for related jobs", id, ". details:", err)
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func DeleteDeployment(job *jobModel.Job) error {
defer cancel()

go func() {
err = waitForJobs(ctx, relatedJobs, []string{jobModel.StatusCompleted, jobModel.StatusFailed, jobModel.StatusTerminated})
err = waitForJobs(ctx, relatedJobs, []string{jobModel.StatusCompleted, jobModel.StatusTerminated})
if err != nil {
log.Println("failed to wait for related jobs", id, ". details:", err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/jobs/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ func NewRunner(job *jobModel.Job) *Runner {
}

func (runner *Runner) Run() {

if jobDef := GetJobDef(runner.Job); jobDef != nil {
if jobDef.TerminateFunc != nil {
shouldTerminate, err := jobDef.TerminateFunc(runner.Job)
if err != nil {
utils.PrettyPrintError(fmt.Errorf("error executing job (%s) terminate function, terminating the job instead. details: %w", runner.Job.Type, err))

err = jobModel.New().MarkTerminated(runner.Job.ID, err.Error())
if err != nil {
utils.PrettyPrintError(fmt.Errorf("error marking job as terminated. details: %w", err))
Expand All @@ -32,6 +33,7 @@ func (runner *Runner) Run() {

if shouldTerminate {
err = jobModel.New().MarkTerminated(runner.Job.ID, "gracefully terminated by system")
utils.PrettyPrintError(fmt.Errorf("job (%s) gracefully terminated by system", runner.Job.Type))
if err != nil {
utils.PrettyPrintError(fmt.Errorf("error marking job as terminated. details: %w", err))
return
Expand Down
6 changes: 2 additions & 4 deletions pkg/workers/confirm/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func deploymentConfirmer(ctx context.Context) {
continue
}

relatedJobs, err := jobModel.New().GetByArgs(map[string]interface{}{
relatedJobs, err := jobModel.New().ExcludeScheduled().GetByArgs(map[string]interface{}{
"id": deployment.ID,
})

Expand All @@ -45,7 +45,6 @@ func deploymentConfirmer(ctx context.Context) {

allFinished := slices.IndexFunc(relatedJobs, func(j jobModel.Job) bool {
return j.Status != jobModel.StatusCompleted &&
j.Status != jobModel.StatusFailed &&
j.Status != jobModel.StatusTerminated
}) == -1

Expand Down Expand Up @@ -91,7 +90,7 @@ func vmConfirmer(ctx context.Context) {
continue
}

relatedJobs, err := jobModel.New().GetByArgs(map[string]interface{}{
relatedJobs, err := jobModel.New().ExcludeScheduled().GetByArgs(map[string]interface{}{
"id": vm.ID,
})

Expand All @@ -102,7 +101,6 @@ func vmConfirmer(ctx context.Context) {

allFinished := slices.IndexFunc(relatedJobs, func(j jobModel.Job) bool {
return j.Status != jobModel.StatusCompleted &&
j.Status != jobModel.StatusFailed &&
j.Status != jobModel.StatusTerminated
}) == -1

Expand Down

0 comments on commit f787bde

Please sign in to comment.