From 88c1f5bc12ce99a374d6b27a24502b3149de519e Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Wed, 1 Nov 2023 17:27:47 -0700 Subject: [PATCH] Change hashing scheme to match TS SDK v3 (#23) * Change hashing scheme to match TS SDK v3 This updates the hashing scheme for TS SDK v3 compatibility (v1 of our internal API). * Suffix on > 1 iteration * Update hashing scheme and set IDs everywhere * Add sleep opts --- examples/main.go | 72 ++++++++++++++++++++++++++++++++++ funcs.go | 18 +++++---- handler_test.go | 22 +++++------ internal/sdkrequest/manager.go | 27 ++++++------- step/run.go | 17 +++++--- step/run_test.go | 24 ++++++------ step/sleep.go | 16 ++++++-- step/wait_for_event.go | 8 ++-- 8 files changed, 145 insertions(+), 59 deletions(-) create mode 100644 examples/main.go diff --git a/examples/main.go b/examples/main.go new file mode 100644 index 00000000..8ba4f0c1 --- /dev/null +++ b/examples/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/inngest/inngestgo" + "github.com/inngest/inngestgo/step" +) + +func main() { + h := inngestgo.NewHandler("core", inngestgo.HandlerOpts{}) + f := inngestgo.CreateFunction( + inngestgo.FunctionOpts{ + ID: "account-created", + }, + // Run on every api/account.created event. + inngestgo.EventTrigger("api/account.created", nil), + UserCreated, + ) + h.Register(f) + http.ListenAndServe(":8080", h) +} + +func UserCreated(ctx context.Context, input inngestgo.Input[any]) (any, error) { + // Sleep for a second + step.Sleep(ctx, "initial-delay", time.Second) + + // Run a step which emails the user. + step.Run(ctx, "on-user-created", func(ctx context.Context) (string, error) { + return "", nil + }) + + fn, err := step.WaitForEvent[FunctionCreatedEvent](ctx, "wait-for-activity", step.WaitForEventOpts{ + Name: "Wait for a function to be created", + Event: "api/function.created", + If: inngestgo.StrPtr("async.data.user_id == event.data.user_id"), + Timeout: time.Hour * 72, + }) + if err == step.ErrEventNotReceived { + // A function wasn't created within 3 days. + return nil, nil + } + + // The function event is fully typed :) + fmt.Println(fn.Data.FunctionID) + + return nil, nil +} + +// AccountCreatedEvent represents the fully defined event received when an account is created. +// +// This is shorthand for defining a new Inngest-conforming struct: +// +// type AccountCreatedEvent struct { +// Name string `json:"name"` +// Data AccountCreatedEventData `json:"data"` +// User any `json:"user"` +// Timestamp int64 `json:"ts,omitempty"` +// Version string `json:"v,omitempty"` +// } +type AccountCreatedEvent inngestgo.GenericEvent[AccountCreatedEventData, any] +type AccountCreatedEventData struct { + AccountID string +} + +type FunctionCreatedEvent inngestgo.GenericEvent[FunctionCreatedEventData, any] +type FunctionCreatedEventData struct { + FunctionID string +} diff --git a/funcs.go b/funcs.go index 36a764b6..1b305bb4 100644 --- a/funcs.go +++ b/funcs.go @@ -10,10 +10,12 @@ import ( ) type FunctionOpts struct { - Name string // ID is an optional function ID. If not specified, the ID // will be auto-generated by lowercasing and slugging the name. - ID *string + ID string + // Name represents a human-readable function name. + Name string + Priority *inngest.Priority Concurrency []inngest.Concurrency Idempotency *string @@ -22,8 +24,7 @@ type FunctionOpts struct { Debounce *Debounce // RateLimit allows the function to be rate limited. - RateLimit *RateLimit - + RateLimit *RateLimit BatchEvents *inngest.EventBatchConfig } @@ -116,10 +117,11 @@ func CreateFunction[T any]( return sf } -func EventTrigger(name string) inngest.Trigger { +func EventTrigger(name string, expression *string) inngest.Trigger { return inngest.Trigger{ EventTrigger: &inngest.EventTrigger{ - Event: name, + Event: name, + Expression: expression, }, } } @@ -197,10 +199,10 @@ func (s servableFunc) Config() FunctionOpts { } func (s servableFunc) Slug() string { - if s.fc.ID == nil { + if s.fc.ID == "" { return slug.Make(s.fc.Name) } - return *s.fc.ID + return s.fc.ID } func (s servableFunc) Name() string { diff --git a/handler_test.go b/handler_test.go index af1034d8..3943b11e 100644 --- a/handler_test.go +++ b/handler_test.go @@ -43,21 +43,21 @@ func TestRegister(t *testing.T) { FunctionOpts{ Name: "my func name", }, - EventTrigger("test/event.a"), + EventTrigger("test/event.a", nil), func(ctx context.Context, input Input[EventA]) (any, error) { return nil, nil }, ) b := CreateFunction( FunctionOpts{Name: "another func"}, - EventTrigger("test/event.b"), + EventTrigger("test/event.b", nil), func(ctx context.Context, input Input[EventB]) (any, error) { return nil, nil }, ) c := CreateFunction( FunctionOpts{Name: "batch func", BatchEvents: &inngest.EventBatchConfig{MaxSize: 20, Timeout: "10s"}}, - EventTrigger("test/batch.a"), + EventTrigger("test/batch.a", nil), func(ctx context.Context, input Input[EventC]) (any, error) { return nil, nil }, @@ -87,7 +87,7 @@ func TestInvoke(t *testing.T) { } a := CreateFunction( FunctionOpts{Name: "my func name"}, - EventTrigger("test/event.a"), + EventTrigger("test/event.a", nil), func(ctx context.Context, event Input[EventA]) (any, error) { require.EqualValues(t, event.Event, input) return resp, nil @@ -120,7 +120,7 @@ func TestInvoke(t *testing.T) { } a := CreateFunction( FunctionOpts{Name: "my func name", BatchEvents: &inngest.EventBatchConfig{MaxSize: 5, Timeout: "10s"}}, - EventTrigger("test/event.a"), + EventTrigger("test/event.a", nil), func(ctx context.Context, event Input[EventA]) (any, error) { require.EqualValues(t, event.Event, input) require.EqualValues(t, len(event.Events), 5) @@ -153,7 +153,7 @@ func TestInvoke(t *testing.T) { } a := CreateFunction( FunctionOpts{Name: "my func name"}, - EventTrigger("test/event.a"), + EventTrigger("test/event.a", nil), func(ctx context.Context, event Input[*EventA]) (any, error) { require.NotNil(t, event.Event) require.EqualValues(t, *event.Event, input) @@ -186,7 +186,7 @@ func TestInvoke(t *testing.T) { } a := CreateFunction( FunctionOpts{Name: "my func name"}, - EventTrigger("test/event.a"), + EventTrigger("test/event.a", nil), func(ctx context.Context, event Input[any]) (any, error) { require.NotNil(t, event.Event) val, ok := event.Event.(map[string]any) @@ -224,7 +224,7 @@ func TestInvoke(t *testing.T) { } a := CreateFunction( FunctionOpts{Name: "my func name"}, - EventTrigger("test/event.a"), + EventTrigger("test/event.a", nil), func(ctx context.Context, event Input[map[string]any]) (any, error) { require.NotNil(t, event.Event) val := event.Event @@ -256,7 +256,7 @@ func TestInvoke(t *testing.T) { // before deploying to Inngest. CreateFunction( FunctionOpts{Name: "my func name"}, - EventTrigger("test/event.a"), + EventTrigger("test/event.a", nil), func(ctx context.Context, event Input[io.Reader]) (any, error) { return nil, nil }, @@ -282,7 +282,7 @@ func TestServe(t *testing.T) { var called int32 a := CreateFunction( FunctionOpts{Name: "My servable function!"}, - EventTrigger("test/event.a"), + EventTrigger("test/event.a", nil), func(ctx context.Context, input Input[EventA]) (any, error) { atomic.AddInt32(&called, 1) require.EqualValues(t, event, input.Event) @@ -347,7 +347,7 @@ func TestSteps(t *testing.T) { a := CreateFunction( FunctionOpts{Name: "step function"}, - EventTrigger("test/event.a"), + EventTrigger("test/event.a", nil), func(ctx context.Context, input Input[EventA]) (any, error) { atomic.AddInt32(&fnCt, 1) stepA := step.Run(ctx, "First step", func(ctx context.Context) (map[string]any, error) { diff --git a/internal/sdkrequest/manager.go b/internal/sdkrequest/manager.go index 211a1553..f331e0da 100644 --- a/internal/sdkrequest/manager.go +++ b/internal/sdkrequest/manager.go @@ -8,7 +8,6 @@ import ( "fmt" "sync" - "github.com/gowebpki/jcs" "github.com/inngest/inngest/pkg/enums" "github.com/inngest/inngest/pkg/execution/state" ) @@ -36,7 +35,7 @@ type InvocationManager interface { Step(op UnhashedOp) (json.RawMessage, bool) // NewOp generates a new unhashed op for creating a state.GeneratorOpcode. This // is required for future execution of a step. - NewOp(op enums.Opcode, name string, opts map[string]any) UnhashedOp + NewOp(op enums.Opcode, id string, opts map[string]any) UnhashedOp } // NewManager returns an InvocationManager to manage the incoming executor request. This @@ -117,12 +116,11 @@ func (r *requestCtxManager) Step(op UnhashedOp) (json.RawMessage, bool) { return val, ok } -func (r *requestCtxManager) NewOp(op enums.Opcode, name string, opts map[string]any) UnhashedOp { +func (r *requestCtxManager) NewOp(op enums.Opcode, id string, opts map[string]any) UnhashedOp { r.l.Lock() defer r.l.Unlock() - key := fmt.Sprintf("%s-%s", op, name) - n, ok := r.indexes[key] + n, ok := r.indexes[id] if ok { // We have an index already, so increase the counter as we're // adding to this key. @@ -130,10 +128,10 @@ func (r *requestCtxManager) NewOp(op enums.Opcode, name string, opts map[string] } // Update indexes for each particualar key. - r.indexes[key] = n + r.indexes[id] = n return UnhashedOp{ - Name: name, + ID: id, Op: op, Opts: opts, Pos: uint(n), @@ -141,7 +139,7 @@ func (r *requestCtxManager) NewOp(op enums.Opcode, name string, opts map[string] } type UnhashedOp struct { - Name string `json:"name"` + ID string `json:"id"` Op enums.Opcode `json:"op"` Opts map[string]any `json:"opts"` Pos uint `json:"pos"` @@ -149,15 +147,12 @@ type UnhashedOp struct { } func (u UnhashedOp) Hash() (string, error) { - j, err := json.Marshal(u) - if err != nil { - return "", err - } - byt, err := jcs.Transform(j) - if err != nil { - return "", err + input := u.ID + if u.Pos > 0 { + // We only suffix the counter if there's > 1 operation with the same ID. + input = fmt.Sprintf("%s:%d", u.ID, u.Pos) } - sum := sha1.Sum(byt) + sum := sha1.Sum([]byte(input)) return hex.EncodeToString(sum[:]), nil } diff --git a/step/run.go b/step/run.go index 876c0f5a..5ec97348 100644 --- a/step/run.go +++ b/step/run.go @@ -10,17 +10,24 @@ import ( "github.com/inngest/inngest/pkg/execution/state" ) +type RunOpts struct { + // ID represents the optional step name. + ID string + // Name represents the optional step name. + Name string +} + // StepRun runs any code reliably, with retries, returning the resulting data. If this // fails the function stops. // // TODO: Allow users to catch single step errors. func Run[T any]( ctx context.Context, - name string, + id string, f func(ctx context.Context) (T, error), ) T { mgr := preflight(ctx) - op := mgr.NewOp(enums.OpcodeStep, name, nil) + op := mgr.NewOp(enums.OpcodeStep, id, nil) if val, ok := mgr.Step(op); ok { // This step has already ran as we have state for it. @@ -28,7 +35,7 @@ func Run[T any]( ft := reflect.TypeOf(f) v := reflect.New(ft.Out(0)).Interface() if err := json.Unmarshal(val, v); err != nil { - mgr.SetErr(fmt.Errorf("error unmarshalling state for step '%s': %w", name, err)) + mgr.SetErr(fmt.Errorf("error unmarshalling state for step '%s': %w", id, err)) panic(ControlHijack{}) } val, _ := reflect.ValueOf(v).Elem().Interface().(T) @@ -47,13 +54,13 @@ func Run[T any]( byt, err := json.Marshal(result) if err != nil { - mgr.SetErr(fmt.Errorf("unable to marshal run respone for '%s': %w", name, err)) + mgr.SetErr(fmt.Errorf("unable to marshal run respone for '%s': %w", id, err)) } mgr.AppendOp(state.GeneratorOpcode{ ID: op.MustHash(), Op: enums.OpcodeStep, - Name: name, + Name: id, Data: byt, }) panic(ControlHijack{}) diff --git a/step/run_test.go b/step/run_test.go index da3b7589..68556b51 100644 --- a/step/run_test.go +++ b/step/run_test.go @@ -44,8 +44,8 @@ func TestStep(t *testing.T) { // indexes name = "struct" op := sdkrequest.UnhashedOp{ - Op: enums.OpcodeStep, - Name: name, + Op: enums.OpcodeStep, + ID: name, } byt, err := json.Marshal(expected) @@ -64,8 +64,8 @@ func TestStep(t *testing.T) { // indexes name = "struct ptrs" op := sdkrequest.UnhashedOp{ - Op: enums.OpcodeStep, - Name: name, + Op: enums.OpcodeStep, + ID: name, } byt, err := json.Marshal(expected) @@ -84,8 +84,8 @@ func TestStep(t *testing.T) { // indexes name = "slices" op := sdkrequest.UnhashedOp{ - Op: enums.OpcodeStep, - Name: name, + Op: enums.OpcodeStep, + ID: name, } byt, err := json.Marshal([]response{expected}) @@ -104,8 +104,8 @@ func TestStep(t *testing.T) { // indexes name = "ints" op := sdkrequest.UnhashedOp{ - Op: enums.OpcodeStep, - Name: name, + Op: enums.OpcodeStep, + ID: name, } byt, err := json.Marshal(646) @@ -124,8 +124,8 @@ func TestStep(t *testing.T) { // indexes name = "nil" op := sdkrequest.UnhashedOp{ - Op: enums.OpcodeStep, - Name: name, + Op: enums.OpcodeStep, + ID: name, } byt, err := json.Marshal(nil) @@ -156,8 +156,8 @@ func TestStep(t *testing.T) { }() op := sdkrequest.UnhashedOp{ - Op: enums.OpcodeStep, - Name: name, + Op: enums.OpcodeStep, + ID: name, } require.NotEmpty(t, mgr.Ops()) diff --git a/step/sleep.go b/step/sleep.go index de8ef135..6ecabab2 100644 --- a/step/sleep.go +++ b/step/sleep.go @@ -9,10 +9,15 @@ import ( str2duration "github.com/xhit/go-str2duration/v2" ) -func Sleep(ctx context.Context, duration time.Duration) { +type SleepOpts struct { + ID string + // Name represents the optional step name. + Name string +} + +func Sleep(ctx context.Context, id string, duration time.Duration) { mgr := preflight(ctx) - name := str2duration.String(duration) - op := mgr.NewOp(enums.OpcodeSleep, name, nil) + op := mgr.NewOp(enums.OpcodeSleep, id, nil) if _, ok := mgr.Step(op); ok { // We've already slept. return @@ -20,7 +25,10 @@ func Sleep(ctx context.Context, duration time.Duration) { mgr.AppendOp(state.GeneratorOpcode{ ID: op.MustHash(), Op: enums.OpcodeSleep, - Name: name, + Name: id, + Opts: map[string]any{ + "duration": str2duration.String(duration), + }, }) panic(ControlHijack{}) } diff --git a/step/wait_for_event.go b/step/wait_for_event.go index 89632cb1..d238df5e 100644 --- a/step/wait_for_event.go +++ b/step/wait_for_event.go @@ -17,6 +17,8 @@ var ( ) type WaitForEventOpts struct { + // Name represents the optional step name. + Name string // Event is the event name to wait for. Event string // Timeout is how long to wait. We must always timebound event lsiteners. @@ -25,7 +27,7 @@ type WaitForEventOpts struct { If *string `json:"if"` } -func WaitForEvent[T any](ctx context.Context, opts WaitForEventOpts) (T, error) { +func WaitForEvent[T any](ctx context.Context, id string, opts WaitForEventOpts) (T, error) { mgr := preflight(ctx) args := map[string]any{ "timeout": str2duration.String(opts.Timeout), @@ -34,7 +36,7 @@ func WaitForEvent[T any](ctx context.Context, opts WaitForEventOpts) (T, error) args["if"] = *opts.If } - op := mgr.NewOp(enums.OpcodeWaitForEvent, opts.Event, args) + op := mgr.NewOp(enums.OpcodeWaitForEvent, id, args) if val, ok := mgr.Step(op); ok { var output T if val == nil || bytes.Equal(val, []byte{0x6e, 0x75, 0x6c, 0x6c}) { @@ -50,7 +52,7 @@ func WaitForEvent[T any](ctx context.Context, opts WaitForEventOpts) (T, error) mgr.AppendOp(state.GeneratorOpcode{ ID: op.MustHash(), Op: op.Op, - Name: op.Name, + Name: opts.Name, Opts: op.Opts, }) panic(ControlHijack{})