Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for running actions #496

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions act/act_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package act

import (
"time"

sdkAct "github.com/gatewayd-io/gatewayd-plugin-sdk/act"
)

func createWaitActEntities(async bool) (
string,
map[string]*sdkAct.Action,
map[string]*sdkAct.Signal,
map[string]*sdkAct.Policy,
) {
name := "waitSync"
if async {
name = "waitAsync"
}
actions := map[string]*sdkAct.Action{
name: {
Name: name,
Metadata: nil,
Sync: !async,
Terminal: false,
Run: func(_ map[string]any, _ ...sdkAct.Parameter) (any, error) {
time.Sleep(1 * time.Second)
return true, nil
},
},
}
signals := map[string]*sdkAct.Signal{
name: {
Name: name,
Metadata: map[string]any{
"log": true,
"level": "info",
"message": "test",
"async": async,
},
},
}
policy := map[string]*sdkAct.Policy{
name: sdkAct.MustNewPolicy(
name,
`true`,
map[string]any{"log": "enabled"},
),
}

return name, actions, signals, policy
}
94 changes: 62 additions & 32 deletions act/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Registry struct {
logger zerolog.Logger
// Timeout for policy evaluation.
policyTimeout time.Duration
// Default timeout for running actions
defaultActionTimeout time.Duration

Signals map[string]*sdkAct.Signal
Policies map[string]*sdkAct.Policy
Expand All @@ -41,6 +43,7 @@ func NewActRegistry(
builtinActions map[string]*sdkAct.Action,
defaultPolicy string,
policyTimeout time.Duration,
defaultActionTimeout time.Duration,
logger zerolog.Logger,
) *Registry {
if builtinSignals == nil || builtinsPolicies == nil || builtinActions == nil {
Expand Down Expand Up @@ -82,13 +85,14 @@ func NewActRegistry(
logger.Debug().Str("name", defaultPolicy).Msg("Using default policy")

return &Registry{
logger: logger,
policyTimeout: policyTimeout,
Signals: builtinSignals,
Policies: builtinsPolicies,
Actions: builtinActions,
DefaultPolicy: builtinsPolicies[defaultPolicy],
DefaultSignal: builtinSignals[defaultPolicy],
logger: logger,
policyTimeout: policyTimeout,
defaultActionTimeout: defaultActionTimeout,
Signals: builtinSignals,
Policies: builtinsPolicies,
Actions: builtinActions,
DefaultPolicy: builtinsPolicies[defaultPolicy],
DefaultSignal: builtinSignals[defaultPolicy],
}
}

Expand Down Expand Up @@ -229,41 +233,67 @@ func (r *Registry) Run(
// Prepend the logger to the parameters.
params = append([]sdkAct.Parameter{WithLogger(r.logger)}, params...)

timeout := r.defaultActionTimeout
if action.Timeout > 0 {
timeout = time.Duration(action.Timeout) * time.Second
}
var ctx context.Context
var cancel context.CancelFunc
// if timeout is zero, then the context should not have timeout
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
// If the action is synchronous, run it and return the result immediately.
if action.Sync {
r.logger.Debug().Fields(map[string]interface{}{
"executionMode": "sync",
"action": action.Name,
}).Msgf("Running action")

output, err := action.Run(output.Metadata, params...)
if err != nil {
r.logger.Error().Err(err).Str("action", action.Name).Msg("Error running action")
return nil, gerr.ErrRunningAction.Wrap(err)
}
return output, nil
defer cancel()
return runActionWithTimeout(ctx, action, output, params, r.logger)
}

r.logger.Debug().Fields(map[string]interface{}{
"executionMode": "async",
// Run the action asynchronously.
go func() {
defer cancel()
_, _ = runActionWithTimeout(ctx, action, output, params, r.logger)
}()
return nil, gerr.ErrAsyncAction
}

func runActionWithTimeout(
ctx context.Context,
action *sdkAct.Action,
output *sdkAct.Output,
params []sdkAct.Parameter,
logger zerolog.Logger,
) (any, *gerr.GatewayDError) {
execMode := "sync"
if !action.Sync {
execMode = "async"
}
logger.Debug().Fields(map[string]interface{}{
"executionMode": execMode,
"action": action.Name,
}).Msgf("Running action")
outputChan := make(chan any)
errChan := make(chan *gerr.GatewayDError)

// Run the action asynchronously.
// TODO: Add a way to cancel the action.
go func(
action *sdkAct.Action,
output *sdkAct.Output,
params []sdkAct.Parameter,
logger zerolog.Logger,
) {
_, err := action.Run(output.Metadata, params...)
go func() {
actionOutput, err := action.Run(output.Metadata, params...)
if err != nil {
logger.Error().Err(err).Str("action", action.Name).Msg("Error running action")
errChan <- gerr.ErrRunningAction.Wrap(err)
}
}(action, output, params, r.logger)

return nil, gerr.ErrAsyncAction
outputChan <- actionOutput
}()
select {
case <-ctx.Done():
logger.Error().Str("action", action.Name).Msg("Action timed out")
return nil, gerr.ErrRunningActionTimeout
case actionOutput := <-outputChan:
return actionOutput, nil
case err := <-errChan:
return nil, err
}
}

// WithLogger returns a parameter with the logger to be used by the action.
Expand Down
Loading