Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# disable diffs on go.mod and go.sum so GitHub won't render the text changes
go.mod -diff
go.sum -diff
13 changes: 7 additions & 6 deletions eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/synadia-labs/rita/codec"
"github.com/synadia-labs/rita/id"
"github.com/synadia-labs/rita/testutil"
"github.com/synadia-labs/rita/types"
Expand Down Expand Up @@ -233,14 +234,14 @@ func TestEventStoreWithRegistry(t *testing.T) {

nc, _ := nats.Connect(srv.ClientURL())

tr, err := types.NewRegistry(map[string]*types.Type{
"order-placed": {
Init: func() any { return &OrderPlaced{} },
tr, err := types.NewInMemRegistry(map[string]types.Type{
"order-placed": types.InMemType{
InitFn: func() any { return &OrderPlaced{} },
},
"order-shipped": {
Init: func() any { return &OrderShipped{} },
"order-shipped": &types.InMemType{
InitFn: func() any { return &OrderShipped{} },
},
})
}, codec.Default)
is.NoErr(err)

r, err := New(t.Context(), nc, TypeRegistry(tr))
Expand Down
24 changes: 15 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
module github.com/synadia-labs/rita

go 1.24
go 1.25.0

require (
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/nats-io/nats-server/v2 v2.11.6
github.com/nats-io/nats.go v1.44.0
github.com/nats-io/nats-server/v2 v2.11.9
github.com/nats-io/nats.go v1.45.0
github.com/nats-io/nuid v1.0.1
github.com/santhosh-tekuri/jsonschema/v6 v6.0.2
github.com/synadia-io/schema-registry-sdk/go/schemaregistry v0.0.0-20250915095715-75c30f0c9054
github.com/vmihailenco/msgpack/v5 v5.4.1
google.golang.org/protobuf v1.36.6
google.golang.org/protobuf v1.36.9
)

require (
github.com/google/go-tpm v0.9.5 // indirect
github.com/antithesishq/antithesis-sdk-go v0.5.0 // indirect
github.com/google/go-tpm v0.9.6 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/nats-io/jwt/v2 v2.7.4 // indirect
github.com/nats-io/jwt/v2 v2.8.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/stretchr/testify v1.10.0 // indirect
github.com/synadia-labs/schema-registry v0.0.0-20250912022759-6e8f74fd7d2e // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/time v0.13.0 // indirect
)
58 changes: 34 additions & 24 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,43 +1,53 @@
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0=
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/antithesishq/antithesis-sdk-go v0.5.0 h1:cudCFF83pDDANcXFzkQPUHHedfnnIbUO3JMr9fqwFJs=
github.com/antithesishq/antithesis-sdk-go v0.5.0/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI=
github.com/dlclark/regexp2 v1.11.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU=
github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA=
github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.6 h1:4VXRjbTUFKEB+7UoaKL3F5Y83xC7MxPoIONOnGgpkHw=
github.com/nats-io/nats-server/v2 v2.11.6/go.mod h1:2xoztlcb4lDL5Blh1/BiukkKELXvKQ5Vy29FPVRBUYs=
github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M=
github.com/nats-io/nats.go v1.44.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.9 h1:k7nzHZjUf51W1b08xiQih63Rdxh0yr5O4K892Mx5gQA=
github.com/nats-io/nats-server/v2 v2.11.9/go.mod h1:1MQgsAQX1tVjpf3Yzrk3x2pzdsZiNL/TVP3Amhp3CR8=
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 h1:KRzFb2m7YtdldCEkzs6KqmJw4nqEVZGK7IN2kJkjTuQ=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.2/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/synadia-io/schema-registry-sdk/go/schemaregistry v0.0.0-20250915095715-75c30f0c9054 h1:0avKvAQjiEP1sH3uWjq4PYuzEmF6EpombFEEf3Ogr0w=
github.com/synadia-io/schema-registry-sdk/go/schemaregistry v0.0.0-20250915095715-75c30f0c9054/go.mod h1:JO62cktLMaW5JaSBflMxlRsBcojnAp2IkbMmWMZlw0c=
github.com/synadia-labs/schema-registry v0.0.0-20250912022759-6e8f74fd7d2e h1:M5vE1D6jmI6HCp4o8XVLXyhsH4AVS2GNU7cPmdAUl5s=
github.com/synadia-labs/schema-registry v0.0.0-20250912022759-6e8f74fd7d2e/go.mod h1:Tmrimtwm23ar97KaRI5YVbfJSyf3ugPoLT0nz3qZAos=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4 changes: 2 additions & 2 deletions rita.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type RitaOption interface {
}

// TypeRegistry sets an explicit type registry.
func TypeRegistry(types *types.Registry) RitaOption {
func TypeRegistry(types types.Registry) RitaOption {
return ritaOption(func(o *Rita) error {
o.types = types
return nil
Expand Down Expand Up @@ -65,7 +65,7 @@ type Rita struct {

id id.ID
clock clock.Clock
types *types.Registry
types types.Registry
}

// UnpackEvent unpacks an Event from a NATS message.
Expand Down
173 changes: 173 additions & 0 deletions types/inmem_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package types

import (
"fmt"
"reflect"

"github.com/synadia-labs/rita/codec"
)

// InMemRegistry is used for transparently marshaling and unmarshaling messages
// and values from their native types to their network/storage representation.
type InMemRegistry struct {
// Codec for marshaling and unmarshaling a values.
codec codec.Codec

// Index of types.
types map[string]Type

// Reflection type to the type name.
rtypes map[reflect.Type]string
}

func (r *InMemRegistry) Codec() codec.Codec {
return r.codec
}

type InMemType struct {
InitFn func() any
}

func (t InMemType) Init() func() any {
return t.InitFn
}

func (r *InMemRegistry) validate(name string, typ Type) error {
if name == "" {
return fmt.Errorf("%w: missing name", ErrTypeNotValid)
}

if err := validateTypeName(name); err != nil {
return err
}

if typ.Init() == nil {
return fmt.Errorf("%w: %s: missing init func", ErrTypeNotValid, name)
}
// Ensure the initialize value is not nil.
v := typ.Init()()
if v == nil {
return fmt.Errorf("%w: %s: init func returns nil", ErrTypeNotValid, name)
}

// Get the Go type in order to transparently serialize to the correct name.
rt := reflect.TypeOf(v)

// Ensure the initialize type is a pointer so that deserialization works.
if rt.Kind() != reflect.Ptr {
return fmt.Errorf("%w: %s: init func must return a pointer value", ErrTypeNotValid, name)
}

// Ensure that the pointer value is a struct type.
if rt.Elem().Kind() != reflect.Struct {
return fmt.Errorf("%w: %s: value type must be a struct", ErrTypeNotValid, name)
}

// Ensure [de]serialization works in the base case.
b, err := r.codec.Marshal(v)
if err != nil {
return fmt.Errorf("%w: %s: failed to marshal with codec: %s", ErrTypeNotValid, name, err)
}

err = r.codec.Unmarshal(b, v)
if err != nil {
return fmt.Errorf("%w: %s: failed to unmarshal with codec: %s", ErrTypeNotValid, name, err)
}

return nil
}

func (r *InMemRegistry) addType(name string, typ Type) {
r.types[name] = typ

// Initialize a value, reflect the type to index.
v := typ.Init()()
rt := reflect.TypeOf(v)

r.rtypes[rt] = name
r.rtypes[rt.Elem()] = name
}

// Init a value given the registered name of the type.
func (r *InMemRegistry) Init(t string) (any, error) {
x, ok := r.types[t]
if !ok {
return nil, fmt.Errorf("%w: %s", ErrTypeNotRegistered, t)
}

v := x.Init()()
return v, nil
}

// Lookup returns the registered name of the type given a value.
func (r *InMemRegistry) Lookup(v any) (string, error) {
rt := reflect.TypeOf(v)
t, ok := r.rtypes[rt]
if !ok {
return "", fmt.Errorf("%w: %s", ErrNoTypeForStruct, rt)
}

return t, nil
}

// Marshal serializes the value to a byte slice. This call
// validates the type is registered and delegates to the codec.
func (r *InMemRegistry) Marshal(v any) ([]byte, error) {
_, err := r.Lookup(v)
if err != nil {
return nil, err
}

b, err := r.codec.Marshal(v)
if err != nil {
return b, fmt.Errorf("%T: marshal error: %w", v, err)
}
return b, nil
}

// Unmarshal deserializes a byte slice into the value. This call
// validates the type is registered and delegates to the codec.
func (r *InMemRegistry) Unmarshal(b []byte, v any) error {
_, err := r.Lookup(v)
if err != nil {
return err
}

err = r.codec.Unmarshal(b, v)
if err != nil {
return fmt.Errorf("%T: unmarshal error: %w", v, err)
}
return nil
}

// UnmarshalType initializes a new value for the registered type,
// unmarshals the byte slice, and returns it.
func (r *InMemRegistry) UnmarshalType(b []byte, t string) (any, error) {
v, err := r.Init(t)
if err != nil {
return nil, err
}
err = r.Unmarshal(b, v)
if err != nil {
return nil, err
}
return v, nil
}

func NewInMemRegistry(types map[string]Type, c codec.Codec) (Registry, error) {
r := &InMemRegistry{
codec: c,
types: make(map[string]Type),
rtypes: make(map[reflect.Type]string),
}

for n, t := range types {
err := r.validate(n, t)
if err != nil {
return nil, err
}
r.addType(n, t)
}

return r, nil
}
Loading
Loading