Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: execution improvements #6203

Merged
merged 11 commits into from
Feb 28, 2025
7 changes: 7 additions & 0 deletions api/v1/testkube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8669,6 +8669,10 @@ components:
type: string
format: date-time
description: when the execution has been scheduled to run
assignedAt:
type: string
format: date-time
description: when the execution has been assigned to some runner
statusAt:
type: string
format: date-time
Expand Down Expand Up @@ -8729,6 +8733,9 @@ components:
description: identifier for group of correlated executions
format: bson objectId
example: "62f395e004109209b50edfc1"
runnerId:
type: string
description: identifier of the runner where it has been executed
name:
type: string
description: execution name
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/v1/testkube/model_test_workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type TestWorkflowExecution struct {
Number int32 `json:"number,omitempty"`
// when the execution has been scheduled to run
ScheduledAt time.Time `json:"scheduledAt,omitempty"`
// when the execution has been assigned to some runner
AssignedAt time.Time `json:"assignedAt,omitempty"`
// when the execution result's status has changed last time (queued, passed, failed)
StatusAt time.Time `json:"statusAt,omitempty"`
// structured tree of steps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type TestWorkflowExecutionSummary struct {
Id string `json:"id"`
// identifier for group of correlated executions
GroupId string `json:"groupId,omitempty"`
// identifier of the runner where it has been executed
RunnerId string `json:"runnerId,omitempty"`
// execution name
Name string `json:"name"`
// sequence number for the execution
Expand Down
6 changes: 5 additions & 1 deletion pkg/cloud/data/testworkflow/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,14 @@ func (r *CloudRepository) Init(ctx context.Context, id string, data testworkflow
execution.Namespace = data.Namespace
execution.Signature = data.Signature
execution.RunnerId = data.RunnerID
execution.AssignedAt = data.AssignedAt
if execution.AssignedAt.IsZero() {
execution.AssignedAt = time.Now()
}
return r.Update(ctx, execution)
}

func (r *CloudRepository) Assign(ctx context.Context, id string, prevRunnerId string, newRunnerId string) (bool, error) {
func (r *CloudRepository) Assign(ctx context.Context, id string, prevRunnerId string, newRunnerId string, assignedAt *time.Time) (bool, error) {
return false, errors.New("not supported")
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/cloud/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/controlplane/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *Server) InitExecution(ctx context.Context, req *cloud.InitExecutionRequ
if err != nil {
return nil, err
}
err = s.resultsRepository.Init(ctx, req.Id, testworkflow.InitData{RunnerID: "oss", Namespace: req.Namespace, Signature: signature})
err = s.resultsRepository.Init(ctx, req.Id, testworkflow.InitData{RunnerID: "oss", Namespace: req.Namespace, Signature: signature, AssignedAt: time.Now()})
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controlplaneclient/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"io"
"net/http"
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -241,6 +242,7 @@ func (c *client) legacyInitExecution(ctx context.Context, environmentId, executi
execution.RunnerId = c.proContext.Agent.ID
execution.Namespace = namespace
execution.Signature = signature
execution.AssignedAt = time.Now()
jsonPayload, err := json.Marshal(cloudtestworkflow.ExecutionUpdateRequest{
WorkflowExecution: *execution,
})
Expand Down
48 changes: 48 additions & 0 deletions pkg/repository/testworkflow/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type FilterImpl struct {
FActorName string
FActorType testkube.TestWorkflowRunningContextActorType
FGroupID string
FRunnerID string
FInitialized *bool
FAssigned *bool
}

func NewExecutionsFilter() *FilterImpl {
Expand Down Expand Up @@ -107,6 +110,21 @@ func (f *FilterImpl) WithGroupID(groupID string) *FilterImpl {
return f
}

func (f *FilterImpl) WithRunnerID(runnerID string) *FilterImpl {
f.FRunnerID = runnerID
return f
}

func (f *FilterImpl) WithInitialized(initialized bool) *FilterImpl {
f.FInitialized = &initialized
return f
}

func (f *FilterImpl) WithAssigned(assigned bool) *FilterImpl {
f.FAssigned = &assigned
return f
}

func (f FilterImpl) Name() string {
return f.FName
}
Expand Down Expand Up @@ -206,3 +224,33 @@ func (f FilterImpl) GroupIDDefined() bool {
func (f FilterImpl) GroupID() string {
return f.FGroupID
}

func (f FilterImpl) RunnerIDDefined() bool {
return f.FRunnerID != ""
}

func (f FilterImpl) RunnerID() string {
return f.FRunnerID
}

func (f FilterImpl) InitializedDefined() bool {
return f.FInitialized != nil
}

func (f FilterImpl) Initialized() bool {
if f.FInitialized == nil {
return false
}
return *f.FInitialized
}

func (f FilterImpl) AssignedDefined() bool {
return f.FAssigned != nil
}

func (f FilterImpl) Assigned() bool {
if f.FAssigned == nil {
return false
}
return *f.FAssigned
}
15 changes: 11 additions & 4 deletions pkg/repository/testworkflow/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ type LabelSelector struct {
}

type InitData struct {
RunnerID string
Namespace string
Signature []testkube.TestWorkflowSignature
RunnerID string
Namespace string
Signature []testkube.TestWorkflowSignature
AssignedAt time.Time
}

const PageDefaultLimit int = 100
Expand Down Expand Up @@ -53,6 +54,12 @@ type Filter interface {
ActorTypeDefined() bool
GroupID() string
GroupIDDefined() bool
RunnerID() string
RunnerIDDefined() bool
Assigned() bool
AssignedDefined() bool
Initialized() bool
InitializedDefined() bool
}

//go:generate mockgen -destination=./mock_repository.go -package=testworkflow "github.com/kubeshop/testkube/pkg/repository/testworkflow" Repository
Expand Down Expand Up @@ -101,7 +108,7 @@ type Repository interface {
// Init sets the initialization data from the runner
Init(ctx context.Context, id string, data InitData) error
// Assign execution to selected runner
Assign(ctx context.Context, id string, prevRunnerId string, newRunnerId string) (bool, error)
Assign(ctx context.Context, id string, prevRunnerId string, newRunnerId string, assignedAt *time.Time) (bool, error)
// AbortIfQueued marks execution as aborted if it's queued
AbortIfQueued(ctx context.Context, id string) (bool, error)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/repository/testworkflow/mock_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 31 additions & 2 deletions pkg/repository/testworkflow/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,34 @@ func composeQueryAndOpts(filter Filter) (bson.M, *options.FindOptions) {
query["runningcontext.actor.type_"] = filter.ActorType()
}

if filter.RunnerIDDefined() {
query["runnerid"] = filter.RunnerID()
} else if filter.AssignedDefined() {
if filter.Assigned() {
query["runnerid"] = bson.M{"$not": bson.M{"$in": bson.A{nil, ""}}}
} else {
query["runnerid"] = bson.M{"$in": bson.A{nil, ""}}
}
}

if filter.InitializedDefined() {
var q bson.M
if filter.Initialized() {
q = bson.M{"$expr": bson.M{"$or": bson.A{
bson.M{"$ne": bson.A{"$result.status", "queued"}},
bson.M{"$and": []bson.M{
{"$not": bson.M{"$in": bson.A{"$result.steps", bson.A{nil, bson.M{}}}}},
}},
}}}
} else {
q = bson.M{"$expr": bson.M{"$and": bson.A{
bson.M{"$eq": bson.A{"$result.status", "queued"}},
bson.M{"$in": bson.A{"$result.steps", bson.A{nil, bson.M{}}}},
}}}
}
query = bson.M{"$and": bson.A{query, q}}
}

if filter.GroupIDDefined() {
query = bson.M{"$and": bson.A{
bson.M{"$expr": bson.M{"$or": bson.A{
Expand Down Expand Up @@ -683,15 +711,16 @@ func (r *MongoRepository) Init(ctx context.Context, id string, data InitData) er
return err
}

func (r *MongoRepository) Assign(ctx context.Context, id string, prevRunnerId string, newRunnerId string) (bool, error) {
func (r *MongoRepository) Assign(ctx context.Context, id string, prevRunnerId string, newRunnerId string, assignedAt *time.Time) (bool, error) {
res, err := r.Coll.UpdateOne(ctx, bson.M{
"$and": []bson.M{
{"id": id},
{"result.status": bson.M{"$in": bson.A{testkube.QUEUED_TestWorkflowStatus, testkube.RUNNING_TestWorkflowStatus, testkube.PAUSED_TestWorkflowStatus}}},
{"$or": []bson.M{{"runnerid": prevRunnerId}, {"runnerid": newRunnerId}, {"runnerid": nil}}},
},
}, bson.M{"$set": map[string]interface{}{
"runnerid": newRunnerId,
"runnerid": newRunnerId,
"assignedat": assignedAt,
}})
if err != nil {
return false, err
Expand Down
1 change: 1 addition & 0 deletions pkg/runner/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func (a *agentLoop) runTestWorkflow(environmentId string, executionId string, ex
execution.Namespace = result.Namespace
execution.Signature = result.Signature
execution.RunnerId = a.proContext.Agent.ID
execution.AssignedAt = time.Now()
if err = a.init(context.Background(), environmentId, execution); err != nil {
logger.Errorw("failed to mark execution as initialized", "error", err)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func New(
storageSkipVerify bool,
globalTemplateInlineYaml string,
) Runner {

var globalTemplateInline *testworkflowsv1.TestWorkflowTemplate
if globalTemplateInlineYaml != "" {
globalTemplateInline = new(testworkflowsv1.TestWorkflowTemplate)
Expand Down
2 changes: 2 additions & 0 deletions pkg/testworkflows/testworkflowexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
errors2 "errors"
"io"
"math"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc"
Expand Down Expand Up @@ -195,6 +196,7 @@ func (e *executor) executeDirect(ctx context.Context, environmentId string, req
if environmentId == "" {
execution.RunnerId = "oss"
}
execution.AssignedAt = time.Now()

// Start the execution
err = e.Start(environmentId, execution, sensitiveDataHandler.Get(execution.Id))
Expand Down
12 changes: 5 additions & 7 deletions pkg/testworkflows/testworkflowexecutor/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ func (s *scheduler) update(ctx context.Context, execution *testkube.TestWorkflow
func (s *scheduler) init(ctx context.Context, execution *testkube.TestWorkflowExecution) error {
err := retry(SaveResultRetryMaxAttempts, SaveResultRetryBaseDelay, func() error {
err := s.resultsRepository.Init(ctx, execution.Id, testworkflow.InitData{
RunnerID: execution.RunnerId,
Namespace: execution.Namespace,
Signature: execution.Signature,
RunnerID: execution.RunnerId,
Namespace: execution.Namespace,
Signature: execution.Signature,
AssignedAt: execution.AssignedAt,
})
if err != nil {
s.logger.Warnw("failed to initialize the TestWorkflow execution in database", "recoverable", true, "executionId", execution.Id, "error", err)
Expand Down Expand Up @@ -333,10 +334,7 @@ func (s *scheduler) Schedule(ctx context.Context, sensitiveDataHandler Sensitive
matcher := make(map[string]*cloud.ExecutionTargetLabels)
maps.Copy(matcher, target.Match)
for k, v := range labels {
if matcher[k] == nil {
matcher[k] = &cloud.ExecutionTargetLabels{}
}
matcher[k] = &cloud.ExecutionTargetLabels{Labels: append(matcher[k].Labels, v)}
matcher[k] = &cloud.ExecutionTargetLabels{Labels: []string{v}}
}
selectors = append(selectors, &cloud.ScheduleExecution{
Selector: execution.Selector,
Expand Down
Loading
Loading