From 41ee00a287996d7b44afc0f053a5c685c1b3cb99 Mon Sep 17 00:00:00 2001 From: Jaskaranbir Date: Thu, 15 Nov 2018 11:06:55 -0500 Subject: [PATCH] Add flashsale endpoint --- gql/entity/flashsale/README.md | 26 ++++ gql/entity/flashsale/mutations.go | 26 ++++ gql/entity/flashsale/queries.go | 94 +++++++++++++ gql/entity/flashsale/resolver/insert.go | 121 ++++++++++++++++ gql/entity/flashsale/resolver/query.go | 140 +++++++++++++++++++ gql/entity/flashsale/resolver/query_count.go | 15 ++ gql/entity/flashsale/types.go | 46 ++++++ gql/schema/mutation.go | 2 + 8 files changed, 470 insertions(+) create mode 100644 gql/entity/flashsale/README.md create mode 100644 gql/entity/flashsale/mutations.go create mode 100644 gql/entity/flashsale/queries.go create mode 100644 gql/entity/flashsale/resolver/insert.go create mode 100644 gql/entity/flashsale/resolver/query.go create mode 100644 gql/entity/flashsale/resolver/query_count.go create mode 100644 gql/entity/flashsale/types.go diff --git a/gql/entity/flashsale/README.md b/gql/entity/flashsale/README.md new file mode 100644 index 0000000..bc0006b --- /dev/null +++ b/gql/entity/flashsale/README.md @@ -0,0 +1,26 @@ +## Usage examples +--- + +### Mutations + +* #### InventoryInsert + ```graphql + mutation{ + SaleInsert( + saleID: "cdc7a14c-19e3-488e-8c4e-22d91fd42ef1", + items: [ + { + itemID: "39322979-d33b-4504-ba90-f2e427bdd72b", + weight: 12.40, + lot: "test-lot", + upc: "test-upc", + sku: "test-sku" + } + ] + timestamp: 1539222685400, + ) + { + timestamp + } + } + ``` diff --git a/gql/entity/flashsale/mutations.go b/gql/entity/flashsale/mutations.go new file mode 100644 index 0000000..4a10b2c --- /dev/null +++ b/gql/entity/flashsale/mutations.go @@ -0,0 +1,26 @@ +package flashsale + +import ( + "github.com/TerrexTech/go-apigateway/gql/entity/flashsale/resolver" + "github.com/graphql-go/graphql" +) + +// Mutations are GraphQL mutations for Inventory. +var Mutations = map[string]*graphql.Field{ + "FlashSaleInsert": &graphql.Field{ + Type: Sale, + Description: "Inserts item into Inventory", + Args: graphql.FieldConfigArgument{ + "flashSaleID": &graphql.ArgumentConfig{ + Type: graphql.String, + }, + "items": &graphql.ArgumentConfig{ + Type: graphql.NewList(SaleInput), + }, + "timestamp": &graphql.ArgumentConfig{ + Type: graphql.Float, + }, + }, + Resolve: resolver.Insert, + }, +} diff --git a/gql/entity/flashsale/queries.go b/gql/entity/flashsale/queries.go new file mode 100644 index 0000000..a8e4a0c --- /dev/null +++ b/gql/entity/flashsale/queries.go @@ -0,0 +1,94 @@ +package flashsale + +import ( + "github.com/TerrexTech/go-apigateway/gql/entity/sale/resolver" + "github.com/graphql-go/graphql" +) + +// Queries are GraphQL queries for Inventory +var Queries = map[string]*graphql.Field{ + // "InventoryQueryItem": &graphql.Field{ + // Type: graphql.NewList(Inventory), + // Description: "Inventory Query", + // Args: graphql.FieldConfigArgument{ + // "itemID": &graphql.ArgumentConfig{ + // Type: graphql.String, + // }, + // "dateArrived": &graphql.ArgumentConfig{ + // Type: graphql.Float, + // }, + // "dateSold": &graphql.ArgumentConfig{ + // Type: graphql.Float, + // }, + // "deviceID": &graphql.ArgumentConfig{ + // Type: graphql.String, + // }, + // "donateWeight": &graphql.ArgumentConfig{ + // Type: graphql.Float, + // }, + // "lot": &graphql.ArgumentConfig{ + // Type: graphql.String, + // }, + // "name": &graphql.ArgumentConfig{ + // Type: graphql.String, + // }, + // "origin": &graphql.ArgumentConfig{ + // Type: graphql.String, + // }, + // "price": &graphql.ArgumentConfig{ + // Type: graphql.Float, + // }, + // "rsCustomerID": &graphql.ArgumentConfig{ + // Type: graphql.String, + // }, + // "salePrice": &graphql.ArgumentConfig{ + // Type: graphql.Float, + // }, + // "sku": &graphql.ArgumentConfig{ + // Type: graphql.String, + // }, + // "soldWeight": &graphql.ArgumentConfig{ + // Type: graphql.Float, + // }, + // "timestamp": &graphql.ArgumentConfig{ + // Type: graphql.Float, + // }, + // "totalWeight": &graphql.ArgumentConfig{ + // Type: graphql.Float, + // }, + // "upc": &graphql.ArgumentConfig{ + // Type: graphql.String, + // }, + // "wasteWeight": &graphql.ArgumentConfig{ + // Type: graphql.Float, + // }, + // }, + // Resolve: resolver.QueryItem, + // }, + // "InventoryQueryTimestamp": &graphql.Field{ + // Type: graphql.NewList(Inventory), + // Description: "Inventory Query by Timestamp", + // Args: graphql.FieldConfigArgument{ + // "start": &graphql.ArgumentConfig{ + // Type: graphql.Int, + // }, + // "end": &graphql.ArgumentConfig{ + // Type: graphql.Int, + // }, + // "count": &graphql.ArgumentConfig{ + // Type: graphql.Int, + // }, + // }, + // Resolve: resolver.QueryTimestamp, + // }, + "FlashSaleQueryCount": &graphql.Field{ + Type: graphql.NewList(Sale), + Description: "Returns latest sales as specified in count", + Args: graphql.FieldConfigArgument{ + "count": &graphql.ArgumentConfig{ + Type: graphql.Int, + }, + }, + Resolve: resolver.QueryCount, + }, +} diff --git a/gql/entity/flashsale/resolver/insert.go b/gql/entity/flashsale/resolver/insert.go new file mode 100644 index 0000000..6896e02 --- /dev/null +++ b/gql/entity/flashsale/resolver/insert.go @@ -0,0 +1,121 @@ +package resolver + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "time" + + "github.com/TerrexTech/go-apigateway/gql/response" + "github.com/TerrexTech/go-apigateway/gwerrors" + "github.com/TerrexTech/go-apigateway/util" + esmodel "github.com/TerrexTech/go-eventstore-models/model" + "github.com/TerrexTech/uuuid" + "github.com/graphql-go/graphql" + "github.com/pkg/errors" +) + +// Insert is the GraphQL resolver for InsertSale GraphQL query. +var Insert = func(params graphql.ResolveParams) (interface{}, error) { + consTopic := os.Getenv("KAFKA_CONSUMER_TOPIC_FLASHSALE") + + // Marshal Sale-data + saleJSON, err := json.Marshal(params.Args) + if err != nil { + err = errors.Wrap(err, "InvInsertResponseHandler: Error marshalling credentials into JSON") + return nil, err + } + log.Println(string(saleJSON)) + + rootValue := params.Info.RootValue.(map[string]interface{}) + kf := rootValue["kafkaFactory"].(*util.KafkaFactory) + + // CorrelationID + cid, err := uuuid.NewV4() + if err != nil { + err = errors.Wrap(err, "InvInsertResponseHandler: Error generating UUID for cid") + return nil, err + } + eventID, err := uuuid.NewV4() + if err != nil { + err = errors.Wrap(err, "InvInsertResponseHandler: Error generating UUID for InsertSale-Event") + return nil, err + } + + // Publish Insert-Event on Kafka Topic + kf.EventProducer() <- &esmodel.Event{ + EventAction: "insert", + CorrelationID: cid, + AggregateID: 7, + Data: saleJSON, + NanoTime: time.Now().UnixNano(), + UUID: eventID, + YearBucket: 2018, + } + + // Timeout Context + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + krChan, err := kf.EnsureConsumerIO(consTopic, consTopic, false, eventID) + if err != nil { + err = errors.Wrap(err, "InvInsertResponseHandler: Error creating ConsumerIO") + return nil, err + } + + select { + case <-ctx.Done(): + return nil, errors.New("Timed out") + case kr := <-krChan: + insertInvResp := handleInsertInvResponse(kr) + if insertInvResp != nil { + if insertInvResp.Err == nil { + return insertInvResp.Result, nil + } + insertInvErr := insertInvResp.Err + err = errors.Wrap(insertInvErr.Err, "InvInsertResponseHandler: InsertSale Error") + log.Println(err) + outErr := fmt.Errorf("%d: InsertSale Error", insertInvErr.Code) + return nil, outErr + } + } + return nil, errors.New("Unknown Error") +} + +func handleInsertInvResponse(kr esmodel.KafkaResponse) *response.ResolverResponse { + if kr.Error != "" { + err := errors.Wrap( + errors.New(kr.Error), + "InvInsertResponseHandler: Error in KafkaResponse", + ) + log.Println(err) + krerr := gwerrors.NewKRError(err, kr.ErrorCode, err.Error()) + return &response.ResolverResponse{ + Result: nil, + Err: krerr, + } + } + + sale := map[string]interface{}{} + err := json.Unmarshal(kr.Result, &sale) + if err != nil { + err = errors.Wrap( + err, + "InvInsertResponseHandler: "+ + "Error while Unmarshalling KafkaResponse into flashsale", + ) + log.Println(err) + krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) + return &response.ResolverResponse{ + Result: nil, + Err: krerr, + } + } + + return &response.ResolverResponse{ + Result: sale, + Err: nil, + } +} diff --git a/gql/entity/flashsale/resolver/query.go b/gql/entity/flashsale/resolver/query.go new file mode 100644 index 0000000..7d5ccbc --- /dev/null +++ b/gql/entity/flashsale/resolver/query.go @@ -0,0 +1,140 @@ +package resolver + +import ( + "context" + "encoding/json" + "log" + "os" + "time" + + "github.com/TerrexTech/go-apigateway/gql/response" + "github.com/TerrexTech/go-apigateway/gwerrors" + + "github.com/TerrexTech/go-apigateway/util" + esmodel "github.com/TerrexTech/go-eventstore-models/model" + "github.com/TerrexTech/uuuid" + "github.com/graphql-go/graphql" + "github.com/pkg/errors" +) + +// genericQuery is a generic-resolver for Sale GraphQL-query. +// Other queries call this function. +var genericQuery = func(serviceAction string, params graphql.ResolveParams) (interface{}, error) { + consTopic := os.Getenv("KAFKA_CONSUMER_TOPIC_SALE") + + rootValue := params.Info.RootValue.(map[string]interface{}) + kf := rootValue["kafkaFactory"].(*util.KafkaFactory) + + paramsJSON, err := json.Marshal(params.Args) + if err != nil { + err = errors.Wrap(err, "SaleQueryResolver: Error marshalling params into JSON") + return nil, err + } + + // CorrelationID + cid, err := uuuid.NewV4() + if err != nil { + err = errors.Wrap(err, "SaleQueryResolver: Error generating UUID for cid") + return nil, err + } + uuid, err := uuuid.NewV4() + if err != nil { + err = errors.Wrap(err, "SaleQueryResolver: Error generating UUID fozr Query-Event") + return nil, err + } + + // log.Println("+++++++++++++++") + // log.Println(uuid.String()) + // Publish Auth-Request on Kafka Topic + kf.EventProducer() <- &esmodel.Event{ + EventAction: "query", + CorrelationID: cid, + AggregateID: 3, + Data: paramsJSON, + NanoTime: time.Now().UnixNano(), + ServiceAction: serviceAction, + UUID: uuid, + YearBucket: 2018, + } + + cio, err := kf.EnsureConsumerIO(consTopic, consTopic, false, uuid) + if err != nil { + err = errors.Wrap(err, "SaleQueryResolver: Error creating ConsumerIO") + return nil, err + } + // Timeout Context + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + +authResponseLoop: + // Check auth-response messages for matching CorrelationID and return result + for { + select { + case <-ctx.Done(): + break authResponseLoop + case msg := <-cio: + authRes := handleSaleQueryResponse(msg) + if authRes != nil { + if authRes.Err == nil { + return authRes.Result, nil + } + return nil, errors.New(authRes.Err.Error()) + } + } + } + + return nil, errors.New("Timed out") +} + +func handleSaleQueryResponse(kr esmodel.KafkaResponse) *response.ResolverResponse { + if kr.Error != "" { + err := errors.New(kr.Error) + err = errors.Wrap(err, "SaleQueryResponseHandler: Error in KafkaResponse") + krerr := gwerrors.NewKRError(err, kr.ErrorCode, err.Error()) + return &response.ResolverResponse{ + Result: nil, + Err: krerr, + } + } + log.Println(string(kr.Result)) + + result := []interface{}{} + err := json.Unmarshal(kr.Result, &result) + if err != nil { + err = errors.Wrap( + err, + "SaleQueryResponseHandler: Error while Unmarshalling sale into KafkaResponse", + ) + log.Println(err) + krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) + return &response.ResolverResponse{ + Result: nil, + Err: krerr, + } + } + + m := []map[string]interface{}{} + for i, r := range result { + item, assertOK := r.(map[string]interface{}) + if !assertOK { + err = errors.New("error asserting item to map[string]interface{}") + err = errors.Wrapf( + err, + "SaleQueryResponseHandler: Error asserting item at index: %d", i, + ) + log.Println(err) + krerr := gwerrors.NewKRError(err, gwerrors.InternalError, err.Error()) + return &response.ResolverResponse{ + Result: nil, + Err: krerr, + } + } + + m = append(m, item) + } + + return &response.ResolverResponse{ + Result: m, + Err: nil, + } +} diff --git a/gql/entity/flashsale/resolver/query_count.go b/gql/entity/flashsale/resolver/query_count.go new file mode 100644 index 0000000..ba0dfa2 --- /dev/null +++ b/gql/entity/flashsale/resolver/query_count.go @@ -0,0 +1,15 @@ +package resolver + +import ( + "github.com/graphql-go/graphql" + "github.com/pkg/errors" +) + +// QueryCount returns the latest N sales. +var QueryCount = func(params graphql.ResolveParams) (interface{}, error) { + result, err := genericQuery("count", params) + if err != nil { + err = errors.Wrap(err, "Error in QueryCount") + } + return result, err +} diff --git a/gql/entity/flashsale/types.go b/gql/entity/flashsale/types.go new file mode 100644 index 0000000..3f76311 --- /dev/null +++ b/gql/entity/flashsale/types.go @@ -0,0 +1,46 @@ +package flashsale + +import ( + "github.com/graphql-go/graphql" +) + +// Sale is the GraphQL-Type for Sale model. +var Sale = graphql.NewObject( + graphql.ObjectConfig{ + Name: "FlashSale", + Fields: graphql.Fields{ + "flashSaleID": &graphql.Field{ + Type: graphql.String, + }, + "items": &graphql.Field{ + Type: graphql.NewList(SaleInput), + }, + "timestamp": &graphql.Field{ + Type: graphql.Float, + }, + }, + }, +) + +var SaleInput = graphql.NewInputObject( + graphql.InputObjectConfig{ + Name: "FlashSaleInput", + Fields: graphql.InputObjectConfigFieldMap{ + "itemID": &graphql.InputObjectFieldConfig{ + Type: graphql.String, + }, + "upc": &graphql.InputObjectFieldConfig{ + Type: graphql.String, + }, + "weight": &graphql.InputObjectFieldConfig{ + Type: graphql.Float, + }, + "lot": &graphql.InputObjectFieldConfig{ + Type: graphql.String, + }, + "sku": &graphql.InputObjectFieldConfig{ + Type: graphql.String, + }, + }, + }, +) diff --git a/gql/schema/mutation.go b/gql/schema/mutation.go index a44a22d..1f55412 100644 --- a/gql/schema/mutation.go +++ b/gql/schema/mutation.go @@ -2,6 +2,7 @@ package schema import ( "github.com/TerrexTech/go-apigateway/gql/entity/auth" + "github.com/TerrexTech/go-apigateway/gql/entity/flashsale" "github.com/TerrexTech/go-apigateway/gql/entity/inventory" "github.com/TerrexTech/go-apigateway/gql/entity/sale" "github.com/graphql-go/graphql" @@ -14,5 +15,6 @@ var RootMutation = graphql.NewObject(graphql.ObjectConfig{ auth.Mutations, inventory.Mutations, sale.Mutations, + flashsale.Mutations, ), })