From 1795902c0771f413b003416d03ba4ae422101911 Mon Sep 17 00:00:00 2001 From: Kamesh Sampath Date: Fri, 8 Dec 2023 12:23:07 +0530 Subject: [PATCH] (fix): use Schema Registry - use schema registry to register the todo protobuf - use sr to encode/decode messages --- .gitignore | 1 + cmd/client/main.go | 2 - cmd/server/main.go | 110 +++++++++++++++++++++++++++++++++++++--- config/config.go | 7 ++- go.mod | 10 ++-- go.sum | 22 ++++---- impl/server.go | 36 +++++++++++++ impl/todo_impl.go | 123 +++++++++++++++++++++++++++++++++++++++++++++ impl/types.go | 45 +++++++++++++++++ 9 files changed, 332 insertions(+), 24 deletions(-) create mode 100644 impl/server.go create mode 100644 impl/todo_impl.go create mode 100644 impl/types.go diff --git a/.gitignore b/.gitignore index cde0123..3c4b090 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ dist/ +cmd/scratches \ No newline at end of file diff --git a/cmd/client/main.go b/cmd/client/main.go index 39d7789..10291ef 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -14,8 +14,6 @@ import ( ) func main() { - //Jai Guru - logger, _ := zap.NewDevelopment() defer logger.Sync() log := logger.Sugar() diff --git a/cmd/server/main.go b/cmd/server/main.go index a6f5f68..600878f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,20 +1,45 @@ package main import ( + "context" + "fmt" + "io" + "net/http" + + "github.com/kameshsampath/demo-protos/golang/todo" "github.com/kameshsampath/todo-app/config" - "github.com/kameshsampath/todo-app/internal/impl" + "github.com/kameshsampath/todo-app/impl" "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sr" "go.uber.org/zap" + "google.golang.org/protobuf/proto" ) -func main() { - //Jai Guru +type serviceSchemas struct { + id int + typeValue any + index int +} + +var ( + protoMarshallFn = func(a any) ([]byte, error) { + return proto.Marshal(a.(proto.Message)) + } + protoUnMarshallFn = func(b []byte, a any) error { + return proto.Unmarshal(b, a.(proto.Message)) + } + log *zap.SugaredLogger +) + +func init() { logger, _ := zap.NewDevelopment() defer logger.Sync() - log := logger.Sugar() + log = logger.Sugar() +} - config := config.New() +func main() { + config := config.New() client, err := kgo.NewClient( kgo.SeedBrokers(config.Seeds...), kgo.ConsumeTopics(config.Topics...), @@ -22,13 +47,86 @@ func main() { kgo.ConsumerGroup(config.ConsumerGroupID), kgo.AllowAutoTopicCreation(), ) + if err != nil { + log.Fatal(err) + } + ss, err := createSchema(*config) if err != nil { log.Fatal(err) } + // Add all schema type that needs to be registered with srede + // and used when encoding and decoding proto messages + serde := registerSchemas( + []serviceSchemas{ + { + id: ss.ID, + index: 0, + typeValue: &todo.Task{}, + }, + }, + ) + + server := impl.New( + impl.WithClient(client), + impl.WithConfig(config), + impl.WithSerde(serde), + ) - server := impl.New(client, config) if err := server.Run(); err != nil { log.Fatal(err) } } + +// createSchema creates(registers) the schemas with the SchemaRegistry +func createSchema(config config.Config) (*sr.SubjectSchema, error) { + rcl, err := sr.NewClient(sr.URLs(config.SchemaRegistry)) + if err != nil { + return nil, err + } + + res, err := http.Get(config.SchemaURL) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode == http.StatusOK { + sb, err := io.ReadAll(res.Body) + log.Debugf("%s", string(sb)) + if err != nil { + return nil, err + } + + // if err := rcl.DeleteSchema(context.Background(), fmt.Sprintf("%s-value", config.DefaultProducerTopic()), -1, sr.HardDelete); err != nil { + // log.Fatal(err) + // } + // os.Exit(1) + + ss, err := rcl.CreateSchema(context.Background(), fmt.Sprintf("%s-value", config.DefaultProducerTopic()), sr.Schema{ + Type: sr.TypeProtobuf, + Schema: string(sb), + }) + if err != nil { + return nil, err + } + + return &ss, nil + } + return nil, fmt.Errorf("unable to read schema from url, '%s'", config.SchemaURL) +} + +// registerSchemas registers all the schema and its corresponding encoding/decoding function +func registerSchemas(serviceSchemas []serviceSchemas) *sr.Serde { + serde := new(sr.Serde) + for _, schema := range serviceSchemas { + serde.Register( + schema.id, + schema.typeValue, + sr.DecodeFn(protoUnMarshallFn), + sr.EncodeFn(protoMarshallFn), + sr.Index(schema.index), + ) + } + return serde +} diff --git a/config/config.go b/config/config.go index 915e6b8..b4739b6 100644 --- a/config/config.go +++ b/config/config.go @@ -5,13 +5,15 @@ import ( "go.uber.org/zap" ) -// Config sets the configuration for the gRPC service +// Config sets the configuration for the gRPC server type Config struct { Env string `env:"ENV" envDefault:"dev"` ConsumerGroupID string `env:"CONSUMER_GROUP_ID"` Port uint16 `env:"PORT" envDefault:"9090"` - Seeds []string `env:"BROKERS" envSeparator:","` + Seeds []string `env:"BROKERS" envSeparator:"," envDefault:"localhost:19092"` Topics []string `env:"TOPICS" envSeparator:","` + SchemaRegistry string `env:"SCHEMA_REGISTRY" envDefault:"localhost:18081"` + SchemaURL string `env:"SCHEMA_URL" envDefault:"https://raw.githubusercontent.com/kameshsampath/demo-protos/main/todo/todo.proto"` } var log *zap.SugaredLogger @@ -32,6 +34,7 @@ func New() *Config { return config } +// DefaultProducerTopic gets the default topic that will be used as the producer topic func (c *Config) DefaultProducerTopic() string { return c.Topics[0] } diff --git a/go.mod b/go.mod index 5c17ad3..cb15eff 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/caarlos0/env/v10 v10.0.0 github.com/kameshsampath/demo-protos/golang/todo v0.0.0-20231206022715-1afc44aab8bf github.com/twmb/franz-go v1.15.2 + github.com/twmb/franz-go/pkg/sr v0.0.0-20231206062516-c09dc92d2db1 go.uber.org/zap v1.26.0 google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 @@ -15,10 +16,11 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/stretchr/testify v1.8.4 // indirect github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect - go.uber.org/multierr v1.10.0 // indirect - golang.org/x/net v0.14.0 // indirect - golang.org/x/sys v0.11.0 // indirect - golang.org/x/text v0.12.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect ) diff --git a/go.sum b/go.sum index d902bcd..2b9e38e 100644 --- a/go.sum +++ b/go.sum @@ -16,24 +16,26 @@ github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/twmb/franz-go v1.15.2 h1:mt3i7bTAp4GH/kMJiGAikJQUlG+UsCwxCmEy1CcAKYo= github.com/twmb/franz-go v1.15.2/go.mod h1:aos+d/UBuigWkOs+6WoqEPto47EvC2jipLAO5qrAu48= github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= +github.com/twmb/franz-go/pkg/sr v0.0.0-20231206062516-c09dc92d2db1 h1:Ork4lD/g4uG5LYVNSiZ5XY4oy/k0c2do3brj0TxcU6w= +github.com/twmb/franz-go/pkg/sr v0.0.0-20231206062516-c09dc92d2db1/go.mod h1:egX+kicq83hpztv3PRCXKLNO132Ol9JTAJOCRZcqUxI= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= -golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= diff --git a/impl/server.go b/impl/server.go new file mode 100644 index 0000000..0a4939b --- /dev/null +++ b/impl/server.go @@ -0,0 +1,36 @@ +package impl + +import ( + "fmt" + "net" + + "github.com/kameshsampath/demo-protos/golang/todo" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +// Run runs the gRPC server +func (s *Server) Run() error { + logger, _ := zap.NewDevelopment() + defer logger.Sync() + log := logger.Sugar() + + config := s.config + listen, err := net.Listen("tcp", fmt.Sprintf(":%d", config.Port)) + if err != nil { + return fmt.Errorf("error starting server with port %d,%v", config.Port, err) + } + server := grpc.NewServer() + todo.RegisterTodoServer(server, s) + log.Infof("Server started on port %d", config.Port) + // required for grpcurl + if config.Env == "dev" { + reflection.Register(server) + } + + if err := server.Serve(listen); err != nil { + return fmt.Errorf("error starting server,%v", err) + } + return nil +} diff --git a/impl/todo_impl.go b/impl/todo_impl.go new file mode 100644 index 0000000..b5b8032 --- /dev/null +++ b/impl/todo_impl.go @@ -0,0 +1,123 @@ +package impl + +import ( + "context" + "time" + + "github.com/kameshsampath/demo-protos/golang/todo" + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/emptypb" +) + +type result struct { + record *kgo.Record + errors []kgo.FetchError +} + +var log *zap.SugaredLogger + +func init() { + logger, _ := zap.NewDevelopment() + defer logger.Sync() + log = logger.Sugar() +} + +// AddTodo implements todo.TodoServer. +func (s *Server) AddTodo(ctx context.Context, req *todo.TodoAddRequest) (*todo.TodoResponse, error) { + + tctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + r := &kgo.Record{ + Key: []byte(req.Task.Title), + Value: s.serde.MustEncode(req.Task), + } + if err := s.client.ProduceSync(tctx, r).FirstErr(); err != nil { + return nil, err + } + + return &todo.TodoResponse{ + Task: req.Task, + Partition: r.Partition, + Offset: r.Offset, + }, nil +} + +// TodoList implements todo.TodoServer. +func (s *Server) TodoList(empty *emptypb.Empty, stream todo.Todo_TodoListServer) error { + ch := make(chan result) + go func() { + s.poll(ch) + }() + + for { + select { + case r := <-ch: + { + if errs := r.errors; len(errs) > 0 { + var errors = make([]*todo.Error, len(errs)) + for _, err := range errs { + log.Debugf("Error Details", + "Topic", err.Topic, + "Partition", err.Partition, + "Error", err.Err, + ) + errors = append(errors, &todo.Error{ + Topic: err.Topic, + Partition: err.Partition, + Message: err.Err.Error(), + }) + } + stream.Send(&todo.TodoListResponse{ + Response: &todo.TodoListResponse_Errors{Errors: &todo.Errors{ + Error: errors, + }}, + }) + } + b := r.record.Value + task := new(todo.Task) + if err := s.serde.Decode(b, task); err != nil { + //Skip Sending invalid data, just log the error + log.Errorw("Error Decoding task", + "Data", string(b), + "Error", err.Error()) + } else { + stream.Send(&todo.TodoListResponse{ + Response: &todo.TodoListResponse_Todo{ + Todo: &todo.TodoResponse{ + Task: task, + Partition: r.record.Partition, + Offset: r.record.Offset, + }, + }, + }) + } + } + } + } +} + +// poll fetches the record from the backend and adds that the channel +func (s *Server) poll(ch chan result) { + log.Debugf("Started to poll topic:%s", s.config.DefaultProducerTopic()) + //Consumer + for { + fetches := s.client.PollFetches(context.Background()) + if errs := fetches.Errors(); len(errs) > 0 { + ch <- result{ + errors: errs, + } + } + + fetches.EachPartition(func(p kgo.FetchTopicPartition) { + for _, r := range p.Records { + ch <- result{ + record: r, + } + } + }) + } +} + +var _ todo.TodoServer = (*Server)(nil) diff --git a/impl/types.go b/impl/types.go new file mode 100644 index 0000000..48d7595 --- /dev/null +++ b/impl/types.go @@ -0,0 +1,45 @@ +package impl + +import ( + "github.com/kameshsampath/demo-protos/golang/todo" + "github.com/kameshsampath/todo-app/config" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sr" +) + +type Server struct { + client *kgo.Client + config *config.Config + serde *sr.Serde + todo.UnimplementedTodoServer +} + +type ServerOpt func(*Server) + +// Creates a new instance of the server with given configuration +func New(options ...ServerOpt) *Server { + s := &Server{} + for _, opt := range options { + opt(s) + } + + return s +} + +func WithClient(client *kgo.Client) ServerOpt { + return func(s *Server) { + s.client = client + } +} + +func WithSerde(serde *sr.Serde) ServerOpt { + return func(s *Server) { + s.serde = serde + } +} + +func WithConfig(config *config.Config) ServerOpt { + return func(s *Server) { + s.config = config + } +}