diff --git a/.env b/.env index ed6f8a8..0f34f45 100644 --- a/.env +++ b/.env @@ -1,5 +1,6 @@ KAFKA_BROKERS=kafka:9092 +KAFKA_CONSUMER_GROUP_LOGIN=auth.login.consumergroup KAFKA_CONSUMER_TOPIC_LOGIN=auth.login.response KAFKA_PRODUCER_TOPIC_LOGIN=auth.login.request diff --git a/gql/resolver/accesstoken.go b/gql/resolver/accesstoken.go index dcb3a40..063c96a 100644 --- a/gql/resolver/accesstoken.go +++ b/gql/resolver/accesstoken.go @@ -39,7 +39,7 @@ var AccessTokenResolver = func(params graphql.ResolveParams) (interface{}, error err = errors.Wrap(err, "Error renewing AccessToken") return nil, err } - return &model.AuthResponse{ + return &model.AuthTokens{ AccessToken: at, }, nil } diff --git a/gql/resolver/authhandler.go b/gql/resolver/authhandler.go deleted file mode 100644 index 5728903..0000000 --- a/gql/resolver/authhandler.go +++ /dev/null @@ -1,211 +0,0 @@ -package resolver - -import ( - "context" - "encoding/json" - "log" - "time" - - "github.com/TerrexTech/go-apigateway/gwerrors" - - "github.com/Shopify/sarama" - "github.com/TerrexTech/go-apigateway/auth" - "github.com/TerrexTech/go-apigateway/kafka" - "github.com/TerrexTech/go-apigateway/model" - esmodel "github.com/TerrexTech/go-eventstore-models/model" - "github.com/TerrexTech/uuuid" - "github.com/pkg/errors" -) - -// authResponse is the GraphQL response on successful authentication. -type authResponse struct { - authResponse *model.AuthResponse - authErr *gwerrors.KRError -} - -// AccessTokenResolver is the resolver for AccessToken type. -var authHandler = func( - ts auth.TokenStoreI, - credentials map[string]interface{}, - pio *kafka.ProducerIO, - cio *kafka.ConsumerIO, -) (interface{}, error) { - // Marshal User-credentials - credentialsJSON, err := json.Marshal(credentials) - if err != nil { - err = errors.Wrap(err, "LoginResolver: Error marshalling credentials into JSON") - return nil, err - } - - // CorrelationID - cid, err := uuuid.NewV4() - if err != nil { - err = errors.Wrap(err, "LoginResolver: Error generating UUID for cid") - return nil, err - } - // Publish Auth-Request on Kafka Topic - go func() { - pio.ProducerInput() <- &esmodel.KafkaResponse{ - CorrelationID: cid, - Input: credentialsJSON, - Topic: pio.ID(), - } - }() - - // Timeout Context - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - -authResponseLoop: - // Check auth-response messages for matching CorrelationID and return result - for { - select { - case <-ctx.Done(): - break authResponseLoop - case msg := <-cio.ConsumerMessages(): - cio.MarkOffset() <- msg - authRes := handleAuthResponse(msg, ts, cid) - if authRes != nil { - if authRes.authErr == nil { - return authRes.authResponse, nil - } - log.Println("ppppppppppppppppppppppp") - return nil, errors.New(authRes.authErr.Error()) - } - } - } - - return nil, errors.New("Timed out") -} - -func handleAuthResponse( - msg *sarama.ConsumerMessage, - ts auth.TokenStoreI, - cid uuuid.UUID, -) *authResponse { - user, krerr := parseKafkaResponse(msg, cid) - if krerr != nil { - err := errors.Wrap(krerr, "Error authenticating user") - krerr.Err = err - return &authResponse{ - authResponse: nil, - authErr: krerr, - } - } - - if user == nil { - return nil - } - - at, err := genAccessToken(user) - if err != nil { - err = errors.Wrap(err, "Error generating AccessToken") - krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) - return &authResponse{ - authResponse: nil, - authErr: krerr, - } - } - - rt, err := genRefreshToken(ts, user) - if err != nil { - err = errors.Wrap(err, "Error generating RefreshToken") - krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) - return &authResponse{ - authResponse: nil, - authErr: krerr, - } - } - - return &authResponse{ - authResponse: &model.AuthResponse{ - AccessToken: at, - RefreshToken: rt, - }, - authErr: nil, - } -} - -func parseKafkaResponse( - msg *sarama.ConsumerMessage, cid uuuid.UUID, -) (*model.User, *gwerrors.KRError) { - log.Println("aaaaaaaaaaaaaaaaaaaaaa") - kr := &esmodel.KafkaResponse{} - err := json.Unmarshal(msg.Value, kr) - if err != nil { - err = errors.Wrap(err, "LoginResponseHandler: Error unmarshalling message into KafkaResponse") - log.Println(err) - return nil, nil - } - log.Println("zzzzzzzzzzzzzzzzzzzz") - log.Printf("%+v", kr) - if kr.AggregateID != 1 { - return nil, nil - } - - log.Println("bbbbbbbbbbbbbbbbbbbbbbb") - if cid.String() != kr.CorrelationID.String() { - log.Printf( - "Error: Correlation ID Mistmatch: Expected CorrelationID: %s, Got: %s", - cid.String(), - kr.CorrelationID.String(), - ) - return nil, nil - } - - if kr.Error != "" { - err = errors.Wrap(errors.New(kr.Error), "AuthKafkaResponseHandler Error") - krerr := gwerrors.NewKRError(err, kr.ErrorCode, err.Error()) - return nil, krerr - } - - log.Println("cccccccccccccccccccccccc") - user := &model.User{} - err = json.Unmarshal([]byte(kr.Result), user) - if err != nil { - err = errors.Wrap(err, "LoginResponseHandler: Error unmarshalling message-result into User") - krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) - return nil, krerr - } - - log.Println("dddddddddddddddddddddddd") - return user, nil -} - -func genAccessToken(user *model.User) (*model.AccessToken, error) { - accessExp := 15 * time.Minute - claims := &model.Claims{ - Role: user.Role, - Sub: user.UUID, - FirstName: user.FirstName, - LastName: user.LastName, - } - accessToken, err := model.NewAccessToken(accessExp, claims) - if err != nil { - err = errors.Wrap(err, "LoginResponseHandler: Error generating Access-Token") - return nil, err - } - - return accessToken, nil -} - -func genRefreshToken(ts auth.TokenStoreI, user *model.User) (*model.RefreshToken, error) { - refreshExp := (24 * 7) * time.Hour - refreshToken, err := model.NewRefreshToken(refreshExp, user.UUID) - if err != nil { - err = errors.Wrap(err, "Error generating Refresh-Token") - return nil, err - } - err = ts.Set(refreshToken) - // We continue executing the code even if storing refresh-token fails since other parts - // of application might still be accessible. - if err != nil { - err = errors.Wrapf( - err, - "Error storing RefreshToken in TokenStorage for UserID: %s", user.UUID, - ) - return nil, err - } - - return refreshToken, nil -} diff --git a/gql/resolver/authhandler_util.go b/gql/resolver/authhandler_util.go new file mode 100644 index 0000000..0b31c04 --- /dev/null +++ b/gql/resolver/authhandler_util.go @@ -0,0 +1,55 @@ +package resolver + +import ( + "time" + + "github.com/TerrexTech/go-apigateway/gwerrors" + + "github.com/TerrexTech/go-apigateway/auth" + "github.com/TerrexTech/go-apigateway/model" + "github.com/pkg/errors" +) + +// authResponse is the GraphQL response on successful authentication. +type authResponse struct { + tokens *model.AuthTokens + err *gwerrors.KRError +} + +func genAccessToken(user *model.User) (*model.AccessToken, error) { + accessExp := 15 * time.Minute + claims := &model.Claims{ + Role: user.Role, + Sub: user.UUID, + FirstName: user.FirstName, + LastName: user.LastName, + } + accessToken, err := model.NewAccessToken(accessExp, claims) + if err != nil { + err = errors.Wrap(err, "LoginResponseHandler: Error generating Access-Token") + return nil, err + } + + return accessToken, nil +} + +func genRefreshToken(ts auth.TokenStoreI, user *model.User) (*model.RefreshToken, error) { + refreshExp := (24 * 7) * time.Hour + refreshToken, err := model.NewRefreshToken(refreshExp, user.UUID) + if err != nil { + err = errors.Wrap(err, "Error generating Refresh-Token") + return nil, err + } + err = ts.Set(refreshToken) + // We continue executing the code even if storing refresh-token fails since other parts + // of application might still be accessible. + if err != nil { + err = errors.Wrapf( + err, + "Error storing RefreshToken in TokenStorage for UserID: %s", user.UUID, + ) + return nil, err + } + + return refreshToken, nil +} diff --git a/gql/resolver/login.go b/gql/resolver/login.go index 6c4aafc..d9131f0 100644 --- a/gql/resolver/login.go +++ b/gql/resolver/login.go @@ -1,10 +1,18 @@ package resolver import ( + "context" + "encoding/json" + "log" "os" + "time" "github.com/TerrexTech/go-apigateway/auth" - "github.com/TerrexTech/go-apigateway/kafka" + "github.com/TerrexTech/go-apigateway/gwerrors" + "github.com/TerrexTech/go-apigateway/model" + "github.com/TerrexTech/go-apigateway/util" + esmodel "github.com/TerrexTech/go-eventstore-models/model" + "github.com/TerrexTech/uuuid" "github.com/graphql-go/graphql" "github.com/pkg/errors" ) @@ -12,20 +20,127 @@ import ( // LoginResolver is the resolver for Login GraphQL query. var LoginResolver = func(params graphql.ResolveParams) (interface{}, error) { prodTopic := os.Getenv("KAFKA_PRODUCER_TOPIC_LOGIN") + consGroup := os.Getenv("KAFKA_CONSUMER_GROUP_LOGIN") consTopic := os.Getenv("KAFKA_CONSUMER_TOPIC_LOGIN") rootValue := params.Info.RootValue.(map[string]interface{}) - ka := rootValue["kafkaAdapter"].(*kafka.Adapter) - pio, err := ka.EnsureProducerIO(prodTopic, false) + kf := rootValue["kafkaFactory"].(*util.KafkaFactory) + ts := rootValue["tokenStore"].(auth.TokenStoreI) + + // Marshal User-credentials + credentialsJSON, err := json.Marshal(params.Args) + if err != nil { + err = errors.Wrap(err, "LoginResolver: Error marshalling credentials into JSON") + return nil, err + } + + // CorrelationID + cid, err := uuuid.NewV4() + if err != nil { + err = errors.Wrap(err, "LoginResolver: Error generating UUID for cid") + return nil, err + } + krpio, err := kf.EnsureKafkaResponseProducerIO(prodTopic, false) if err != nil { err = errors.Wrap(err, "Error creating ProducerIO for LoginResolver") return nil, err } - cio, err := ka.EnsureConsumerIO(consTopic, consTopic, false) + // Publish Auth-Request on Kafka Topic + go func() { + krpio.Input() <- &esmodel.KafkaResponse{ + CorrelationID: cid, + Input: credentialsJSON, + Topic: prodTopic, + } + }() + + cio, err := kf.EnsureConsumerIO(consGroup, consTopic, false, cid) if err != nil { err = errors.Wrap(err, "Error creating ConsumerIO for LoginResolver") return nil, err } - ts := rootValue["tokenStore"].(auth.TokenStoreI) - return authHandler(ts, params.Args, pio, cio) + // Timeout Context + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + +authResponseLoop: + // Check auth-response messages for matching CorrelationID and return result + for { + select { + case <-ctx.Done(): + break authResponseLoop + case msg := <-cio: + authRes := handleLoginResponse(msg, ts, cid) + if authRes != nil { + if authRes.err == nil { + return authRes.tokens, nil + } + return nil, errors.New(authRes.err.Error()) + } + } + } + + return nil, errors.New("Timed out") +} + +func handleLoginResponse( + kr esmodel.KafkaResponse, + ts auth.TokenStoreI, + cid uuuid.UUID, +) *authResponse { + if kr.Error != "" { + err := errors.New(kr.Error) + err = errors.Wrap(err, "LoginResponseHandler: Error in KafkaResponse") + krerr := gwerrors.NewKRError(err, kr.ErrorCode, err.Error()) + return &authResponse{ + tokens: nil, + err: krerr, + } + } + + user := &model.User{} + err := json.Unmarshal([]byte(kr.Result), user) + if err != nil { + err = errors.Wrap( + err, + "LoginResponseHandler: Error while Unmarshalling user into KafkaResponse", + ) + log.Println(err) + krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) + return &authResponse{ + tokens: nil, + err: krerr, + } + } + + at, err := genAccessToken(user) + if err != nil { + err = errors.Wrap( + err, + "LoginResponseHandler: Error generating AccessToken", + ) + krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) + return &authResponse{ + tokens: nil, + err: krerr, + } + } + + rt, err := genRefreshToken(ts, user) + if err != nil { + err = errors.Wrap(err, "LoginResponseHandler: Error generating RefreshToken") + krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) + return &authResponse{ + tokens: nil, + err: krerr, + } + } + + return &authResponse{ + tokens: &model.AuthTokens{ + AccessToken: at, + RefreshToken: rt, + }, + err: nil, + } } diff --git a/gql/resolver/register.go b/gql/resolver/register.go index 440e153..34085cd 100644 --- a/gql/resolver/register.go +++ b/gql/resolver/register.go @@ -8,55 +8,37 @@ import ( "os" "time" - "github.com/TerrexTech/go-apigateway/gwerrors" - - "github.com/Shopify/sarama" "github.com/TerrexTech/go-apigateway/auth" - "github.com/TerrexTech/go-apigateway/kafka" + "github.com/TerrexTech/go-apigateway/gwerrors" "github.com/TerrexTech/go-apigateway/model" + "github.com/TerrexTech/go-apigateway/util" esmodel "github.com/TerrexTech/go-eventstore-models/model" "github.com/TerrexTech/uuuid" "github.com/graphql-go/graphql" "github.com/pkg/errors" ) -type registerResponse struct { - authResponse *model.AuthResponse - authErr *gwerrors.KRError -} - +// RegisterResolver is the GraphQL resolver for Register-endpoint. var RegisterResolver = func(params graphql.ResolveParams) (interface{}, error) { - log.Println("00000000000000000000000000") prodTopic := os.Getenv("KAFKA_PRODUCER_TOPIC_REGISTER") consTopic := os.Getenv("KAFKA_CONSUMER_TOPIC_REGISTER") - log.Println("111111111111111111111111") // Marshal User-credentials credentialsJSON, err := json.Marshal(params.Args) if err != nil { err = errors.Wrap(err, "RegisterResolver: Error marshalling credentials into JSON") return nil, err } - log.Println(">>>>>Credentials") - log.Println(string(credentialsJSON)) - log.Println("22222222222222222222222222") rootValue := params.Info.RootValue.(map[string]interface{}) - ka := rootValue["kafkaAdapter"].(*kafka.Adapter) + ka := rootValue["kafkaFactory"].(*util.KafkaFactory) ts := rootValue["tokenStore"].(auth.TokenStoreI) - log.Println("333333333333333333333333333") - pio, err := ka.EnsureProducerEventIO(prodTopic, prodTopic, false) + epio, err := ka.EnsureEventProducerIO(prodTopic, prodTopic, false) if err != nil { err = errors.Wrap(err, "Error creating ProducerIO for RegisterResolver") return nil, err } - cio, err := ka.EnsureConsumerIO(consTopic, consTopic, false) - if err != nil { - err = errors.Wrap(err, "Error creating ConsumerIO for RegisterResolver") - return nil, err - } - // CorrelationID cid, err := uuuid.NewV4() if err != nil { @@ -69,11 +51,9 @@ var RegisterResolver = func(params graphql.ResolveParams) (interface{}, error) { return nil, err } - log.Println("444444444444444444444444") // Publish Auth-Request on Kafka Topic go func() { - log.Println("55555555555555555555") - pio.ProducerInput() <- &esmodel.Event{ + epio.Input() <- &esmodel.Event{ Action: "insert", CorrelationID: cid, AggregateID: 1, @@ -84,69 +64,71 @@ var RegisterResolver = func(params graphql.ResolveParams) (interface{}, error) { } }() - log.Println("666666666666666666666") // Timeout Context - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() -registerResponseLoop: - // Check login-response messages for matching CorrelationID and return result - for { - select { - case <-ctx.Done(): - break registerResponseLoop - case msg := <-cio.ConsumerMessages(): - log.Println("777777777777777777777") - cio.MarkOffset() <- msg - log.Println("8888888888888888888888888") - loginRes := handleRegisterResponse(msg, ts, cid) - if loginRes != nil { - log.Println("------------------") - log.Println(loginRes.authErr) - log.Println(loginRes.authResponse) - if loginRes.authErr == nil { - log.Println("*******************((((((((") - return loginRes.authResponse, nil - } - ae := loginRes.authErr - log.Println(ae) - outErr := fmt.Errorf("%d: Registeration Error", ae.Code) - return nil, outErr + krChan, err := ka.EnsureConsumerIO(consTopic, consTopic, false, cid) + if err != nil { + err = errors.Wrap(err, "Error creating ConsumerIO for RegisterResolver") + return nil, err + } + + select { + case <-ctx.Done(): + return nil, errors.New("Timed out") + case kr := <-krChan: + registerResp := handleRegisterResponse(kr, ts) + if registerResp != nil { + if registerResp.err == nil { + return registerResp.tokens, nil } + ae := registerResp.err + outErr := fmt.Errorf("%d: Registration Error", ae.Code) + return nil, outErr } } - - return nil, errors.New("Timed out") + return nil, errors.New("Unknown Error") } func handleRegisterResponse( - msg *sarama.ConsumerMessage, + kr esmodel.KafkaResponse, ts auth.TokenStoreI, - cid uuuid.UUID, -) *registerResponse { - log.Println("8888888888888888888888888") - user, krerr := parseKafkaResponse(msg, cid) - if krerr != nil { - err := errors.Wrap(krerr.Err, "Error authenticating user") +) *authResponse { + if kr.Error != "" { + err := errors.Wrap(errors.New(kr.Error), "RegisterResponseHandler: Error in KafkaResponse") + krerr := gwerrors.NewKRError(err, kr.ErrorCode, err.Error()) + return &authResponse{ + tokens: nil, + err: krerr, + } + } + + user := &model.User{} + err := json.Unmarshal([]byte(kr.Result), user) + if err != nil { + err = errors.Wrap( + err, + "RegisterResponseHandler: Error while Unmarshalling user into KafkaResponse", + ) log.Println(err) krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) - return ®isterResponse{ - authResponse: nil, - authErr: krerr, + return &authResponse{ + tokens: nil, + err: krerr, } } - if user == nil && krerr == nil { - return nil - } - log.Println("9999999999999999999999999") at, err := genAccessToken(user) if err != nil { - err = errors.Wrap(err, "Error generating AccessToken") + err = errors.Wrap( + err, + "RegisterResponseHandler: Error generating AccessToken", + ) krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) - return ®isterResponse{ - authResponse: nil, - authErr: krerr, + return &authResponse{ + tokens: nil, + err: krerr, } } @@ -154,17 +136,17 @@ func handleRegisterResponse( if err != nil { err = errors.Wrap(err, "Error generating RefreshToken") krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) - return ®isterResponse{ - authResponse: nil, - authErr: krerr, + return &authResponse{ + tokens: nil, + err: krerr, } } - return ®isterResponse{ - authResponse: &model.AuthResponse{ + return &authResponse{ + tokens: &model.AuthTokens{ AccessToken: at, RefreshToken: rt, }, - authErr: nil, + err: nil, } } diff --git a/kafka/adapter.go b/kafka/adapter.go deleted file mode 100644 index 561144e..0000000 --- a/kafka/adapter.go +++ /dev/null @@ -1,272 +0,0 @@ -package kafka - -import ( - "encoding/json" - "log" - "time" - - "github.com/Shopify/sarama" - "github.com/TerrexTech/go-eventstore-models/model" - "github.com/TerrexTech/go-kafkautils/consumer" - "github.com/TerrexTech/go-kafkautils/producer" - "github.com/TerrexTech/uuuid" - cluster "github.com/bsm/sarama-cluster" - "github.com/pkg/errors" -) - -var groupIDSuffix = func() string { - id, err := uuuid.NewV4() - if err != nil { - err = errors.Wrap(err, "KafkaAdapter: Error generating GroupIDSuffix") - log.Fatalln(err) - } - return id.String() -}() - -var pioStore = map[string]*ProducerIO{} -var peioStore = map[string]*ProducerEventIO{} -var cioStore = map[string]*ConsumerIO{} - -// Adapter allows conveniently connecting to Adapter, and creates required -// Topics and channels for Adapter-communication. -type Adapter struct { - Brokers []string -} - -// producer creates a new Kafka-Producer used for producing the -// responses after processing consumed Kafka-messages. -func (ka *Adapter) producer( - brokers []string, -) (*producer.Producer, error) { - config := producer.Config{ - KafkaBrokers: brokers, - } - resProducer, err := producer.New(&config) - if err != nil { - return nil, err - } - return resProducer, nil -} - -// Consumer creates a new Kafka-Consumer which listens for the events. -func (ka *Adapter) consumer(name string, topics []string) (*consumer.Consumer, error) { - saramaCfg := cluster.NewConfig() - saramaCfg.Consumer.Offsets.Initial = sarama.OffsetOldest - saramaCfg.Consumer.MaxProcessingTime = 10 * time.Second - saramaCfg.Consumer.Return.Errors = true - - config := &consumer.Config{ - ConsumerGroup: name, - KafkaBrokers: ka.Brokers, - SaramaConfig: saramaCfg, - Topics: topics, - } - - return consumer.New(config) -} - -func (ka *Adapter) newConsumerIO( - name string, - topics []string, - enableErrors bool, -) (*ConsumerIO, error) { - // Create Kafka Event-Consumer - reqConsumer, err := ka.consumer(name, topics) - if err != nil { - err = errors.Wrap(err, "Error Creating Response ConsumerGroup for Events") - return nil, err - } - log.Println("Created Kafka Response ConsumerGroup") - - // A channel which receives consumer-messages to be committed - consumerOffsetChan := make(chan *sarama.ConsumerMessage) - cio := &ConsumerIO{ - consumerErrChan: reqConsumer.Errors(), - consumerMsgChan: reqConsumer.Messages(), - consumerOffsetChan: (chan<- *sarama.ConsumerMessage)(consumerOffsetChan), - } - - if !enableErrors { - go func() { - for err := range reqConsumer.Errors() { - parsedErr := errors.Wrap(err, "Producer Error") - log.Println(parsedErr) - log.Println(err) - } - }() - } - - go func() { - for msg := range consumerOffsetChan { - reqConsumer.MarkOffset(msg, "") - } - }() - log.Println("Created Kafka Response Offset-Commit Channel") - - log.Println("ConsumerIO Ready") - return cio, nil -} - -func (ka *Adapter) newProducerIO(id string, enableErrors bool) (*ProducerIO, error) { - // Create Kafka Response-Producer - resProducer, err := ka.producer(ka.Brokers) - if err != nil { - err = errors.Wrap(err, "Error Creating Response Producer") - return nil, err - } - log.Println("Create Kafka Response-Producer") - resProducerInput, err := resProducer.Input() - if err != nil { - err = errors.Wrap(err, "Error Getting Input-Channel from Producer") - return nil, err - } - - // Setup Producer I/O channels - producerInputChan := make(chan *model.KafkaResponse) - pio := &ProducerIO{ - id: id, - producerInputChan: (chan<- *model.KafkaResponse)(producerInputChan), - producerErrChan: resProducer.Errors(), - } - - if !enableErrors { - go func() { - for err := range resProducer.Errors() { - parsedErr := errors.Wrap(err.Err, "Producer Error") - log.Println(parsedErr) - log.Println(err) - } - }() - } - - // The Kafka-Response post-processing the consumed events - go func() { - for msg := range producerInputChan { - // No need to unnecessarily increase message payload-size, so we remove "Topic". - topic := msg.Topic - msg.Topic = "" - - msgJSON, err := json.Marshal(msg) - if err != nil { - err = errors.Wrapf(err, - "Error Marshalling KafkaResponse with CorrelationID: %s and AggregateID: %d, "+ - "on topic %s", - msg.CorrelationID, - msg.AggregateID, - topic, - ) - log.Println(err) - return - } - - producerMsg := producer.CreateMessage(topic, msgJSON) - resProducerInput <- producerMsg - } - }() - log.Println("ProducerIO Ready") - return pio, nil -} - -func (ka *Adapter) newProducerEventIO(topic string, id string, enableErrors bool) (*ProducerEventIO, error) { - // Create Kafka Response-Producer - resProducer, err := ka.producer(ka.Brokers) - if err != nil { - err = errors.Wrap(err, "Error Creating Response Producer") - return nil, err - } - log.Println("Create Kafka Response-Producer") - resProducerInput, err := resProducer.Input() - if err != nil { - err = errors.Wrap(err, "Error Getting Input-Channel from Producer") - return nil, err - } - - // Setup Producer I/O channels - producerInputChan := make(chan *model.Event) - pio := &ProducerEventIO{ - id: id, - producerInputChan: (chan<- *model.Event)(producerInputChan), - producerErrChan: resProducer.Errors(), - } - - if !enableErrors { - go func() { - for err := range resProducer.Errors() { - parsedErr := errors.Wrap(err.Err, "Producer Error") - log.Println(parsedErr) - log.Println(err) - } - }() - } - - // The Kafka-Response post-processing the consumed events - go func() { - for msg := range producerInputChan { - msgJSON, err := json.Marshal(msg) - if err != nil { - err = errors.Wrapf(err, - "Error Marshalling Event with UUID: %s and AggregateID: %d, "+ - "on topic %s", - msg.UUID, - msg.AggregateID, - topic, - ) - log.Println(err) - return - } - - producerMsg := producer.CreateMessage(topic, msgJSON) - resProducerInput <- producerMsg - } - }() - log.Println("ProducerIO Ready") - return pio, nil -} - -func (ka *Adapter) EnsureProducerEventIO( - topic string, - id string, - enableErrors bool, -) (*ProducerEventIO, error) { - if peioStore[id] == nil { - p, err := ka.newProducerEventIO(topic, id, enableErrors) - if err != nil { - err = errors.Wrap(err, "Error creating ProducerEventIO") - return nil, err - } - peioStore[id] = p - } - return peioStore[id], nil -} - -func (ka *Adapter) EnsureConsumerIO( - id string, - topic string, - enableErrors bool, -) (*ConsumerIO, error) { - if cioStore[id] == nil { - name := id + "-" + groupIDSuffix - c, err := ka.newConsumerIO(name, []string{topic}, enableErrors) - if err != nil { - err = errors.Wrap(err, "Error creating ConsumerIO") - return nil, err - } - cioStore[id] = c - } - return cioStore[id], nil -} - -func (ka *Adapter) EnsureProducerIO( - id string, - enableErrors bool, -) (*ProducerIO, error) { - if pioStore[id] == nil { - p, err := ka.newProducerIO(id, enableErrors) - if err != nil { - err = errors.Wrap(err, "Error creating ProducerIO") - return nil, err - } - pioStore[id] = p - } - return pioStore[id], nil -} diff --git a/kafka/io.go b/kafka/io.go deleted file mode 100644 index eb6cfde..0000000 --- a/kafka/io.go +++ /dev/null @@ -1,70 +0,0 @@ -package kafka - -import ( - "github.com/Shopify/sarama" - "github.com/TerrexTech/go-eventstore-models/model" -) - -// IO provides channels for interacting with Kafka. -// Note: All receive-channels must be read from to prevent deadlock. -type ConsumerIO struct { - consumerErrChan <-chan error - consumerMsgChan <-chan *sarama.ConsumerMessage - consumerOffsetChan chan<- *sarama.ConsumerMessage -} - -// ConsumerErrors returns send-channel where consumer errors are published. -func (cio *ConsumerIO) ConsumerErrors() <-chan error { - return cio.consumerErrChan -} - -// ConsumerMessages returns send-channel where consumer messages are published. -func (cio *ConsumerIO) ConsumerMessages() <-chan *sarama.ConsumerMessage { - return cio.consumerMsgChan -} - -// MarkOffset marks the consumer message-offset to be committed. -// This should be used once a message has done its job. -func (cio *ConsumerIO) MarkOffset() chan<- *sarama.ConsumerMessage { - return cio.consumerOffsetChan -} - -type ProducerIO struct { - id string - producerErrChan <-chan *sarama.ProducerError - producerInputChan chan<- *model.KafkaResponse -} - -func (pio *ProducerIO) ID() string { - return pio.id -} - -// ProducerErrors returns send-channel where producer errors are published. -func (pio *ProducerIO) ProducerErrors() <-chan *sarama.ProducerError { - return pio.producerErrChan -} - -// ProducerInput returns receive-channel where kafka-responses can be produced. -func (pio *ProducerIO) ProducerInput() chan<- *model.KafkaResponse { - return pio.producerInputChan -} - -type ProducerEventIO struct { - id string - producerErrChan <-chan *sarama.ProducerError - producerInputChan chan<- *model.Event -} - -func (pio *ProducerEventIO) ID() string { - return pio.id -} - -// ProducerErrors returns send-channel where producer errors are published. -func (pio *ProducerEventIO) ProducerErrors() <-chan *sarama.ProducerError { - return pio.producerErrChan -} - -// ProducerInput returns receive-channel where kafka-responses can be produced. -func (pio *ProducerEventIO) ProducerInput() chan<- *model.Event { - return pio.producerInputChan -} diff --git a/main/.env b/main/.env index ed6f8a8..0f34f45 100644 --- a/main/.env +++ b/main/.env @@ -1,5 +1,6 @@ KAFKA_BROKERS=kafka:9092 +KAFKA_CONSUMER_GROUP_LOGIN=auth.login.consumergroup KAFKA_CONSUMER_TOPIC_LOGIN=auth.login.response KAFKA_PRODUCER_TOPIC_LOGIN=auth.login.request diff --git a/main/debug b/main/debug deleted file mode 100644 index dc00896..0000000 Binary files a/main/debug and /dev/null differ diff --git a/main/kafkaio.go b/main/kafkaio.go deleted file mode 100644 index 40a65ee..0000000 --- a/main/kafkaio.go +++ /dev/null @@ -1,18 +0,0 @@ -package main - -// // Creates a KafkaIO from KafkaAdapter based on set environment variables. -// func initKafkaIO() (*kafka.IO, error) { -// brokers := os.Getenv("KAFKA_BROKERS") -// consumerGroupName := os.Getenv("KAFKA_CONSUMER_GROUP_LOGIN") -// consumerTopics := os.Getenv("KAFKA_CONSUMER_TOPIC_LOGIN") -// responseTopic := os.Getenv("KAFKA_PRODUCER_TOPIC_LOGIN") - -// kafkaAdapter := &kafka.Adapter{ -// Brokers: *commonutil.ParseHosts(brokers), -// ConsumerGroupName: consumerGroupName, -// ConsumerTopics: *commonutil.ParseHosts(consumerTopics), -// ProducerTopic: responseTopic, -// } - -// return kafkaAdapter.InitIO() -// } diff --git a/main/main.go b/main/main.go index 06f6b2e..dcb9364 100644 --- a/main/main.go +++ b/main/main.go @@ -9,7 +9,7 @@ import ( "github.com/TerrexTech/go-apigateway/auth" "github.com/TerrexTech/go-apigateway/gql/schema" - "github.com/TerrexTech/go-apigateway/kafka" + "github.com/TerrexTech/go-apigateway/util" "github.com/TerrexTech/go-commonutils/commonutil" "github.com/go-redis/redis" "github.com/graphql-go/graphql" @@ -21,7 +21,7 @@ var ( // Schema represents a GraphQL schema Schema graphql.Schema rootObject map[string]interface{} - kafkaAdapter *kafka.Adapter + kafkaFactory *util.KafkaFactory ) func initService() { @@ -37,6 +37,7 @@ func initService() { missingVar, err := commonutil.ValidateEnv( "KAFKA_BROKERS", + "KAFKA_CONSUMER_GROUP_LOGIN", "KAFKA_CONSUMER_TOPIC_LOGIN", "KAFKA_PRODUCER_TOPIC_LOGIN", "KAFKA_CONSUMER_TOPIC_REGISTER", @@ -76,7 +77,7 @@ func initService() { // Kafka Setup brokers := os.Getenv("KAFKA_BROKERS") - kafkaAdapter := &kafka.Adapter{ + kafkaFactory := &util.KafkaFactory{ Brokers: *commonutil.ParseHosts(brokers), } @@ -113,7 +114,7 @@ func initService() { } rootObject = map[string]interface{}{ - "kafkaAdapter": kafkaAdapter, + "kafkaFactory": kafkaFactory, "tokenStore": tokenStore, "db": db, } diff --git a/model/authtoken.go b/model/authtoken.go index 4561add..20a7e72 100644 --- a/model/authtoken.go +++ b/model/authtoken.go @@ -1,7 +1,7 @@ package model -//AuthResponse is the reponse for a successful authentication request. -type AuthResponse struct { +//AuthTokens is the reponse for a successful authentication request. +type AuthTokens struct { AccessToken *AccessToken `json:"access_token"` RefreshToken *RefreshToken `json:"refresh_token"` } diff --git a/util/cid_subadapter.go b/util/cid_subadapter.go new file mode 100644 index 0000000..f657ad8 --- /dev/null +++ b/util/cid_subadapter.go @@ -0,0 +1,42 @@ +package util + +import ( + "sync" + + "github.com/TerrexTech/go-eventstore-models/model" + "github.com/TerrexTech/uuuid" +) + +// CIDStore is the global CorrelationID (CID) storage, intended to be used for operations +// that require sharing CIDs across instances (go routines) +type CIDSubAdapter struct { + cid uuuid.UUID + cidMap map[string]CIDSubAdapter + readChan chan model.KafkaResponse +} + +func newCIDSubAdapter( + topicCIDMap map[string]CIDSubAdapter, + cid uuuid.UUID, + topicMapLock *sync.RWMutex, +) *CIDSubAdapter { + readChan := make(chan model.KafkaResponse, 1) + + ca := CIDSubAdapter{ + cid: cid, + cidMap: topicCIDMap, + readChan: readChan, + } + topicMapLock.Lock() + topicCIDMap[cid.String()] = ca + topicMapLock.Unlock() + return &ca +} + +func (c *CIDSubAdapter) read() <-chan model.KafkaResponse { + return c.readChan +} + +func (c *CIDSubAdapter) write(kr model.KafkaResponse) { + c.readChan <- kr +} diff --git a/util/kafkaconsumer_factory.go b/util/kafkaconsumer_factory.go new file mode 100644 index 0000000..d2418bc --- /dev/null +++ b/util/kafkaconsumer_factory.go @@ -0,0 +1,131 @@ +package util + +import ( + "encoding/json" + "log" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/TerrexTech/go-eventstore-models/model" + "github.com/TerrexTech/go-kafkautils/consumer" + "github.com/TerrexTech/uuuid" + cluster "github.com/bsm/sarama-cluster" + "github.com/pkg/errors" +) + +// Consumer creates a new Kafka-Consumer which listens for the events. +func (ka *KafkaFactory) consumer(name string, topics []string) (*consumer.Consumer, error) { + saramaCfg := cluster.NewConfig() + saramaCfg.Consumer.Offsets.Initial = sarama.OffsetOldest + saramaCfg.Consumer.MaxProcessingTime = 10 * time.Second + saramaCfg.Consumer.Return.Errors = true + + config := &consumer.Config{ + ConsumerGroup: name, + KafkaBrokers: ka.Brokers, + SaramaConfig: saramaCfg, + Topics: topics, + } + + return consumer.New(config) +} + +func (ka *KafkaFactory) EnsureConsumerIO( + group string, + topic string, + enableErrors bool, + cid uuuid.UUID, +) (<-chan model.KafkaResponse, error) { + cidMapLock := &sync.RWMutex{} + + id := group + topic + if cioStore[id] == nil { + // Create Kafka Event-Consumer + reqConsumer, err := ka.consumer(group, []string{topic}) + if err != nil { + err = errors.Wrap(err, "Error Creating Response ConsumerGroup for Events") + return nil, err + } + log.Println("Created Kafka Response ConsumerGroup") + + cioStore[id] = reqConsumer + + go func() { + for msg := range reqConsumer.Messages() { + reqConsumer.MarkOffset(msg, "") + go handleKafkaConsumerMsg(msg, cidMap, cidMapLock) + } + }() + } + + cidMapLock.RLock() + topicCIDMap := cidMap[topic] + cidMapLock.RUnlock() + + if topicCIDMap == nil { + cidMapLock.Lock() + cidMap[topic] = map[string]CIDSubAdapter{} + cidMapLock.Unlock() + + cidMapLock.RLock() + topicCIDMap = cidMap[topic] + cidMapLock.RUnlock() + } + sa := newCIDSubAdapter(topicCIDMap, cid, cidMapLock) + return sa.read(), nil +} + +func handleKafkaConsumerMsg( + msg *sarama.ConsumerMessage, + cidMap map[string]map[string]CIDSubAdapter, + cidMapLock *sync.RWMutex, +) { + kr := model.KafkaResponse{} + err := json.Unmarshal(msg.Value, &kr) + if err != nil { + err = errors.Wrap(err, "Error Unmarshalling Kafka-Message into Kafka-Response") + log.Println(err) + return + } + + cidMapLock.RLock() + topicCIDMap := cidMap[msg.Topic] + cidMapLock.RUnlock() + + if topicCIDMap == nil { + cidMapLock.Lock() + cidMap[msg.Topic] = make(map[string]CIDSubAdapter) + cidMapLock.Unlock() + + cidMapLock.RLock() + topicCIDMap = cidMap[msg.Topic] + cidMapLock.RUnlock() + } + krcid := kr.CorrelationID.String() + + topicMapLock := &sync.RWMutex{} + + topicMapLock.RLock() + cidSubAdapter, exists := topicCIDMap[krcid] + topicMapLock.RUnlock() + + if exists { + topicMapLock.Lock() + delete(topicCIDMap, krcid) + topicMapLock.Unlock() + + cidSubAdapter.write(kr) + return + } + + cidSubAdapter = *newCIDSubAdapter( + topicCIDMap, + kr.CorrelationID, + topicMapLock, + ) + + topicMapLock.Lock() + topicCIDMap[krcid] = cidSubAdapter + topicMapLock.Unlock() +} diff --git a/util/kafkafactory.go b/util/kafkafactory.go new file mode 100644 index 0000000..9ae6b73 --- /dev/null +++ b/util/kafkafactory.go @@ -0,0 +1,19 @@ +package util + +import ( + "github.com/TerrexTech/go-kafkautils/consumer" +) + +var ( + cidMap = make(map[string]map[string]CIDSubAdapter) + + epioStore = map[string]*EventProducerIO{} + krpioStore = map[string]*KafkaResponseProducerIO{} + cioStore = map[string]*consumer.Consumer{} +) + +// KafkaFactory allows conveniently connecting to KafkaFactory, and creates required +// Topics and channels for KafkaFactory-communication. +type KafkaFactory struct { + Brokers []string +} diff --git a/util/kafkaio.go b/util/kafkaio.go new file mode 100644 index 0000000..0f85606 --- /dev/null +++ b/util/kafkaio.go @@ -0,0 +1,66 @@ +package util + +import ( + "github.com/Shopify/sarama" + "github.com/TerrexTech/go-eventstore-models/model" +) + +type producerIO struct { + pid string + producerErrChan <-chan *sarama.ProducerError + producerInputChan chan<- *sarama.ProducerMessage +} + +func (pio *producerIO) id() string { + return pio.pid +} + +// ProducerErrors returns send-channel where producer errors are published. +func (pio *producerIO) errors() <-chan *sarama.ProducerError { + return pio.producerErrChan +} + +// ProducerInput returns receive-channel where kafka-responses can be produced. +func (pio *producerIO) input() chan<- *sarama.ProducerMessage { + return pio.producerInputChan +} + +type KafkaResponseProducerIO struct { + id string + errChan <-chan *sarama.ProducerError + inputChan chan<- *model.KafkaResponse +} + +func (pio *KafkaResponseProducerIO) ID() string { + return pio.id +} + +// ProducerErrors returns send-channel where producer errors are published. +func (pio *KafkaResponseProducerIO) Errors() <-chan *sarama.ProducerError { + return pio.errChan +} + +// ProducerInput returns receive-channel where kafka-responses can be produced. +func (pio *KafkaResponseProducerIO) Input() chan<- *model.KafkaResponse { + return pio.inputChan +} + +type EventProducerIO struct { + id string + errChan <-chan *sarama.ProducerError + inputChan chan<- *model.Event +} + +func (epio *EventProducerIO) ID() string { + return epio.id +} + +// ProducerErrors returns send-channel where producer errors are published. +func (epio *EventProducerIO) Errors() <-chan *sarama.ProducerError { + return epio.errChan +} + +// ProducerInput returns receive-channel where kafka-responses can be produced. +func (epio *EventProducerIO) Input() chan<- *model.Event { + return epio.inputChan +} diff --git a/util/kafkaproducer_factory.go b/util/kafkaproducer_factory.go new file mode 100644 index 0000000..93ee8b5 --- /dev/null +++ b/util/kafkaproducer_factory.go @@ -0,0 +1,188 @@ +package util + +import ( + "encoding/json" + "log" + "os" + + "github.com/Shopify/sarama" + "github.com/TerrexTech/go-eventstore-models/model" + "github.com/TerrexTech/go-kafkautils/producer" + "github.com/pkg/errors" +) + +// producer creates a new Kafka-Producer used for producing the +// responses after processing consumed Kafka-messages. +func (ka *KafkaFactory) producer( + brokers []string, +) (*producer.Producer, error) { + config := producer.Config{ + KafkaBrokers: brokers, + } + resProducer, err := producer.New(&config) + if err != nil { + return nil, err + } + return resProducer, nil +} + +func (ka *KafkaFactory) newProducerIO(id string) (*producerIO, error) { + // Create Kafka Response-Producer + resProducer, err := ka.producer(ka.Brokers) + if err != nil { + err = errors.Wrap(err, "Error Creating Kafka-Producer") + return nil, err + } + log.Println("Creating Kafka-Producer") + resProducerInput, err := resProducer.Input() + if err != nil { + err = errors.Wrap(err, "Error getting Input-Channel from Kafka-Producer") + return nil, err + } + + // Setup Producer I/O channels + producerInputChan := make(chan *sarama.ProducerMessage) + pio := &producerIO{ + pid: id, + producerInputChan: (chan<- *sarama.ProducerMessage)(producerInputChan), + producerErrChan: resProducer.Errors(), + } + + // The Kafka-Response post-processing the consumed events + go func() { + for msg := range producerInputChan { + resProducerInput <- msg + } + }() + log.Println("ProducerIO Ready") + return pio, nil +} + +func (ka *KafkaFactory) newEventProducerIO( + id string, enableErrors bool, +) (*EventProducerIO, error) { + pio, err := ka.newProducerIO(id) + if err != nil { + err = errors.Wrap(err, "Error creating Event-ProducerIO") + } + + inputChan := make(chan *model.Event) + epio := &EventProducerIO{ + id: id, + inputChan: (chan<- *model.Event)(inputChan), + errChan: pio.errors(), + } + + if !enableErrors { + go func() { + for err := range epio.errChan { + parsedErr := errors.Wrap(err.Err, "Event-Producer Error") + log.Println(parsedErr) + log.Println(err) + } + }() + } + + prodTopic := os.Getenv("KAFKA_PRODUCER_TOPIC_REGISTER") + go func() { + for msg := range inputChan { + msgJSON, err := json.Marshal(msg) + if err != nil { + err = errors.Wrapf(err, + "Error Marshalling KafkaResponse with CorrelationID: %s and AggregateID: %d, "+ + "on topic %s", + msg.CorrelationID, + msg.AggregateID, + prodTopic, + ) + log.Println(err) + continue + } + + producerMsg := producer.CreateMessage(prodTopic, msgJSON) + pio.input() <- producerMsg + } + }() + log.Println("KafkaResponse-ProducerIO Ready") + return epio, nil +} + +func (ka *KafkaFactory) newKafkaResponseProducerIO( + id string, enableErrors bool, +) (*KafkaResponseProducerIO, error) { + pio, err := ka.newProducerIO(id) + if err != nil { + err = errors.Wrap(err, "Error creating KafkaResponse-ProducerIO") + } + + inputChan := make(chan *model.KafkaResponse) + krpio := &KafkaResponseProducerIO{ + id: id, + inputChan: (chan<- *model.KafkaResponse)(inputChan), + errChan: pio.errors(), + } + + if !enableErrors { + go func() { + for err := range krpio.errChan { + parsedErr := errors.Wrap(err.Err, "KafkaResponse-Producer Error") + log.Println(parsedErr) + log.Println(err) + } + }() + } + + // The Kafka-Response post-processing the consumed events + go func() { + for msg := range inputChan { + msgJSON, err := json.Marshal(msg) + if err != nil { + err = errors.Wrapf(err, + "Error Marshalling KafkaResponse with CorrelationID: %s and AggregateID: %d, "+ + "on topic %s", + msg.CorrelationID, + msg.AggregateID, + msg.Topic, + ) + log.Println(err) + continue + } + + producerMsg := producer.CreateMessage(msg.Topic, msgJSON) + pio.input() <- producerMsg + } + }() + log.Println("KafkaResponse-ProducerIO Ready") + return krpio, nil +} + +func (ka *KafkaFactory) EnsureEventProducerIO( + topic string, + id string, + enableErrors bool, +) (*EventProducerIO, error) { + if epioStore[id] == nil { + p, err := ka.newEventProducerIO(id, enableErrors) + if err != nil { + err = errors.Wrap(err, "Error creating ProducerEventIO") + return nil, err + } + epioStore[id] = p + } + return epioStore[id], nil +} + +func (ka *KafkaFactory) EnsureKafkaResponseProducerIO( + id string, + enableErrors bool, +) (*KafkaResponseProducerIO, error) { + if krpioStore[id] == nil { + p, err := ka.newKafkaResponseProducerIO(id, enableErrors) + if err != nil { + err = errors.Wrap(err, "Error creating ProducerIO") + return nil, err + } + krpioStore[id] = p + } + return krpioStore[id], nil +}