Skip to content

Commit

Permalink
Fix #222 - (WIP): Basic raw implementation for DSL 1.0.0
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com>
  • Loading branch information
ricardozanini committed Jan 24, 2025
1 parent ff500a0 commit 5b9c322
Show file tree
Hide file tree
Showing 12 changed files with 880 additions and 16 deletions.
95 changes: 95 additions & 0 deletions expr/expr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package expr

import (
"errors"
"fmt"
"github.com/itchyny/gojq"
"strings"
)

// IsStrictExpr returns true if the string is enclosed in `${ }`
func IsStrictExpr(expression string) bool {
return strings.HasPrefix(expression, "${") && strings.HasSuffix(expression, "}")
}

// Sanitize processes the expression to ensure it's ready for evaluation
// It removes `${}` if present and replaces single quotes with double quotes
func Sanitize(expression string) string {
// Remove `${}` enclosure if present
if IsStrictExpr(expression) {
expression = strings.TrimSpace(expression[2 : len(expression)-1])
}

// Replace single quotes with double quotes
expression = strings.ReplaceAll(expression, "'", "\"")

return expression
}

// IsValid tries to parse and check if the given value is a valid expression
func IsValid(expression string) bool {
expression = Sanitize(expression)
_, err := gojq.Parse(expression)
return err == nil
}

// TraverseAndEvaluate recursively processes and evaluates all expressions in a JSON-like structure
func TraverseAndEvaluate(node interface{}, input map[string]interface{}) (interface{}, error) {
switch v := node.(type) {
case map[string]interface{}:
// Traverse map
for key, value := range v {
evaluatedValue, err := TraverseAndEvaluate(value, input)
if err != nil {
return nil, err
}
v[key] = evaluatedValue
}
return v, nil

case []interface{}:
// Traverse array
for i, value := range v {
evaluatedValue, err := TraverseAndEvaluate(value, input)
if err != nil {
return nil, err
}
v[i] = evaluatedValue
}
return v, nil

case string:
// Check if the string is a runtime expression (e.g., ${ .some.path })
if IsStrictExpr(v) {
return EvaluateJQExpression(Sanitize(v), input)
}
return v, nil

default:
// Return other types as-is
return v, nil
}
}

// EvaluateJQExpression evaluates a jq expression against a given JSON input
func EvaluateJQExpression(expression string, input map[string]interface{}) (interface{}, error) {
// Parse the sanitized jq expression
query, err := gojq.Parse(expression)
if err != nil {
return nil, fmt.Errorf("failed to parse jq expression: %s, error: %w", expression, err)
}

// Compile and evaluate the expression
iter := query.Run(input)
result, ok := iter.Next()
if !ok {
return nil, errors.New("no result from jq evaluation")
}

// Check if an error occurred during evaluation
if err, isErr := result.(error); isErr {
return nil, fmt.Errorf("jq evaluation error: %w", err)
}

return result, nil
}
93 changes: 93 additions & 0 deletions impl/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package impl

import (
"context"
"errors"
"sync"
)

type ctxKey string

const executorCtxKey ctxKey = "executorContext"

// ExecutorContext to not confound with Workflow Context as "$context" in the specification.
// This holds the necessary data for the workflow execution within the instance.
type ExecutorContext struct {
mu sync.Mutex
Input map[string]interface{}
Output map[string]interface{}
// Context or `$context` passed through the task executions see https://github.com/serverlessworkflow/specification/blob/main/dsl.md#data-flow
Context map[string]interface{}
}

// SetWorkflowCtx safely sets the $context
func (execCtx *ExecutorContext) SetWorkflowCtx(wfCtx map[string]interface{}) {
execCtx.mu.Lock()
defer execCtx.mu.Unlock()
execCtx.Context = wfCtx
}

// GetWorkflowCtx safely retrieves the $context
func (execCtx *ExecutorContext) GetWorkflowCtx() map[string]interface{} {
execCtx.mu.Lock()
defer execCtx.mu.Unlock()
return execCtx.Context
}

// SetInput safely sets the input map
func (execCtx *ExecutorContext) SetInput(input map[string]interface{}) {
execCtx.mu.Lock()
defer execCtx.mu.Unlock()
execCtx.Input = input
}

// GetInput safely retrieves the input map
func (execCtx *ExecutorContext) GetInput() map[string]interface{} {
execCtx.mu.Lock()
defer execCtx.mu.Unlock()
return execCtx.Input
}

// SetOutput safely sets the output map
func (execCtx *ExecutorContext) SetOutput(output map[string]interface{}) {
execCtx.mu.Lock()
defer execCtx.mu.Unlock()
execCtx.Output = output
}

// GetOutput safely retrieves the output map
func (execCtx *ExecutorContext) GetOutput() map[string]interface{} {
execCtx.mu.Lock()
defer execCtx.mu.Unlock()
return execCtx.Output
}

// UpdateOutput allows adding or updating a single key-value pair in the output map
func (execCtx *ExecutorContext) UpdateOutput(key string, value interface{}) {
execCtx.mu.Lock()
defer execCtx.mu.Unlock()
if execCtx.Output == nil {
execCtx.Output = make(map[string]interface{})
}
execCtx.Output[key] = value
}

// GetOutputValue safely retrieves a single key from the output map
func (execCtx *ExecutorContext) GetOutputValue(key string) (interface{}, bool) {
execCtx.mu.Lock()
defer execCtx.mu.Unlock()
value, exists := execCtx.Output[key]
return value, exists
}

func WithExecutorContext(parent context.Context, wfCtx *ExecutorContext) context.Context {
return context.WithValue(parent, executorCtxKey, wfCtx)
}

func GetExecutorContext(ctx context.Context) (*ExecutorContext, error) {
wfCtx, ok := ctx.Value(executorCtxKey).(*ExecutorContext)
if !ok {
return nil, errors.New("workflow context not found")
}
return wfCtx, nil
}
90 changes: 90 additions & 0 deletions impl/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package impl

import (
"context"
"fmt"
"github.com/serverlessworkflow/sdk-go/v3/model"
)

type StatusPhase string

const (
PendingStatus StatusPhase = "pending"
RunningStatus StatusPhase = "running"
WaitingStatus StatusPhase = "waiting"
CancelledStatus StatusPhase = "cancelled"
FaultedStatus StatusPhase = "faulted"
CompletedStatus StatusPhase = "completed"
)

var _ WorkflowRunner = &workflowRunnerImpl{}

type WorkflowRunner interface {
GetWorkflow() *model.Workflow
Run(input map[string]interface{}) (output map[string]interface{}, err error)
}

func NewDefaultRunner(workflow *model.Workflow) WorkflowRunner {
// later we can implement the opts pattern to define context timeout, deadline, cancel, etc.
// also fetch from the workflow model this information
ctx := WithExecutorContext(context.Background(), &ExecutorContext{})
return &workflowRunnerImpl{
Workflow: workflow,
Context: ctx,
}
}

type workflowRunnerImpl struct {
Workflow *model.Workflow
Context context.Context
}

func (wr *workflowRunnerImpl) GetWorkflow() *model.Workflow {
return wr.Workflow
}

// Run the workflow.
// TODO: Sync execution, we think about async later
func (wr *workflowRunnerImpl) Run(input map[string]interface{}) (output map[string]interface{}, err error) {
output = make(map[string]interface{})
if input == nil {
input = make(map[string]interface{})
}

// TODO: validates input via wr.Workflow.Input.Schema

wfCtx, err := GetExecutorContext(wr.Context)
if err != nil {
return nil, err
}
wfCtx.SetInput(input)
wfCtx.SetOutput(output)

// TODO: process wr.Workflow.Input.From, the result we set to WorkFlowCtx
wfCtx.SetWorkflowCtx(input)

// Run tasks
// For each task, execute.
if wr.Workflow.Do != nil {
for _, taskItem := range *wr.Workflow.Do {
switch task := taskItem.Task.(type) {
case *model.SetTask:
exec, err := NewSetTaskExecutor(taskItem.Key, task)
if err != nil {
return nil, err
}
output, err = exec.Exec(wfCtx.GetWorkflowCtx())
if err != nil {
return nil, err
}
wfCtx.SetWorkflowCtx(output)
default:
return nil, fmt.Errorf("workflow does not support task '%T' named '%s'", task, taskItem.Key)
}
}
}

// Process output and return

return output, err
}
43 changes: 43 additions & 0 deletions impl/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package impl

import (
"fmt"
"github.com/serverlessworkflow/sdk-go/v3/expr"
"github.com/serverlessworkflow/sdk-go/v3/model"
)

var _ TaskExecutor = &SetTaskExecutor{}

type TaskExecutor interface {
Exec(input map[string]interface{}) (map[string]interface{}, error)
}

type SetTaskExecutor struct {
Task *model.SetTask
TaskName string
}

func NewSetTaskExecutor(taskName string, task *model.SetTask) (*SetTaskExecutor, error) {
if task == nil || task.Set == nil {
return nil, fmt.Errorf("no set configuration provided for SetTask %s", taskName)
}
return &SetTaskExecutor{
Task: task,
TaskName: taskName,
}, nil
}

func (s *SetTaskExecutor) Exec(input map[string]interface{}) (output map[string]interface{}, err error) {
setObject := deepClone(s.Task.Set)
result, err := expr.TraverseAndEvaluate(setObject, input)
if err != nil {
return nil, fmt.Errorf("failed to execute Set task '%s': %w", s.TaskName, err)
}

output, ok := result.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("expected output to be a map[string]interface{}, but got a different type. Got: %v", result)
}

return output, nil
}
Loading

0 comments on commit 5b9c322

Please sign in to comment.