Skip to content

Commit

Permalink
Merge #18
Browse files Browse the repository at this point in the history
18: Emit EOS-Event as final Event for request r=Jaskaranbir a=Jaskaranbir



Co-authored-by: Jaskaranbir <jaskaranbir.dhillon@gmail.com>
  • Loading branch information
ninja-bruh and Jaskaranbir committed Nov 24, 2018
2 parents ca9b75d + dd38f3c commit c6c9333
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 31 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ KAFKA_CONSUMER_GROUP=esquery.request.1
KAFKA_CONSUMER_TOPIC=esquery.request

KAFKA_EVENT_BATCH_SIZE=6
KAFKA_END_OF_STREAM_TOKEN=__eos__
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,23 @@ This is achieved by comparing Aggregate's current version in its database to the

* The `events_meta` table contains the aggregate-version for new events. New [events][0] will refer version from this table. Check [Go-EventPersistence][1] service.

* The actual events are stored in `events` table (event-store).
* The actual events are stored in `events` table (Event-Store).

* Every Aggregate maintains its own version. The Aggregate-ID and its current version is sent as request via Kafka-topic `events.rns_eventstore.eventsquery` to this service.
* Every Aggregate maintains its own version. The Aggregate-ID and its current version is sent as request via Kafka-topic `esquery.request` to this service.

* This service compares the Aggregate's version with the version in `events_meta`. The version in `events_meta` is then incremented by 1, so the new events will get the new version.

* All the events from event-store (`events` table) with version > Aggregate-version and < event_meta version are batched into chunks and sent via [Kafka-response][2] on the topic `events.rns_eventstore.eventsresponse.<aggregate-id>`.
* All the events from event-store (`events` table) with version > Aggregate-version and < event_meta version are batched into chunks and sent via [Kafka-response][2] on the topic provided in [EventStoreQuery][3].

* These events are to be consumed by the Aggregate service and processed, and the Aggregate will make required changes in its projection.
* The end of Event-Stream is signalled using an extra Event containing `Action` as `__eos__`, and `UUID` matching the provided `CorrelationID` in `EventStoreQuery`.

Check [.env][3] and [docker-compose.yaml][4] (docker-compose is only used in tests as of yet) files for default configurations (including the Cassandra Keyspace/Table used).
* These events are to be consumed by the Aggregate service and processed, and the Aggregate will make required changes in its projection. The Aggregate must stop listening for messages when the end-of-stream Event is received, since that signals completion of that request.

Check [.env][4] and [docker-compose.yaml][5] (docker-compose is only used in tests as of yet) files for default configurations (including the Cassandra Keyspace/Table used).

[0]: https://github.com/TerrexTech/go-common-models/blob/master/models/event.go
[1]: https://github.com/TerrexTech/go-eventpersistence/
[2]: https://github.com/TerrexTech/go-common-models/blob/master/models/kafka_response.go
[3]: https://github.com/TerrexTech/go-eventstore-query/blob/master/.env
[4]: https://github.com/TerrexTech/go-eventstore-query/blob/master/test/docker-compose.yaml
[3]: https://github.com/TerrexTech/go-common-models/blob/master/model/eventstore_query.go
[4]: https://github.com/TerrexTech/go-eventstore-query/blob/master/.env
[5]: https://github.com/TerrexTech/go-eventstore-query/blob/master/test/docker-compose.yaml
38 changes: 27 additions & 11 deletions ioutil/queryutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,43 @@ import (
"github.com/pkg/errors"
)

// QueryUtilConfig is the configuration for QueryUtil.
type QueryUtilConfig struct {
EOSToken string
// EventStore provides convenient functions to get Aggregate data.
EventStore EventStore
BatchSize int
Logger tlog.Logger
ServiceName string
}

// QueryUtil handles the EventStore-Query.
// This fetches the new Aggregate-Events, updates the Aggregate version in Events-Meta
// table, and sends those events in batches to the Kafka response-topic.
type QueryUtil struct {
// EventStore provides convenient functions to get Aggregate data.
EventStore EventStore
BatchSize int
Logger tlog.Logger
*QueryUtilConfig
}

// NewQueryUtil creates an QueryUtil.
func NewQueryUtil(es EventStore, batchSize int, logger tlog.Logger) (*QueryUtil, error) {
if es == nil {
func NewQueryUtil(config *QueryUtilConfig) (*QueryUtil, error) {
if config.EventStore == nil {
return nil, errors.New("config error: EventStore cannot be nil")
}
if batchSize == 0 {
if config.BatchSize == 0 {
return nil, errors.New("config error: BatchSize must be >0")
}
if logger == nil {
if config.EOSToken == "" {
return nil, errors.New("config error: EOSToken cannot be blank")
}
if config.Logger == nil {
return nil, errors.New("config error: Logger cannot be nil")
}
if config.ServiceName == "" {
return nil, errors.New("config error: ServiceName cannot be blank")
}

return &QueryUtil{
EventStore: es,
BatchSize: batchSize,
Logger: logger,
config,
}, nil
}

Expand Down Expand Up @@ -89,6 +100,11 @@ func (qu *QueryUtil) CreateBatch(
Description: "Sorting events. Events after sort:",
}, events)

events = append(events, model.Event{
Action: qu.EOSToken,
Source: qu.ServiceName,
UUID: correlationID,
})
for i := 0; i < len(events); i += qu.BatchSize {
batchEndIndex := i + qu.BatchSize
if batchEndIndex > len(events) {
Expand Down
44 changes: 39 additions & 5 deletions ioutil/queryutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var _ = Describe("QueryUtil", func() {
var (
eventStore *mock.MEventStore
eosToken string
mockEvents []model.Event
mockMetaVer int64
mockEventStoreQuery *model.EventStoreQuery
Expand All @@ -41,6 +42,7 @@ var _ = Describe("QueryUtil", func() {
}

BeforeEach(func() {
eosToken = os.Getenv("KAFKA_END_OF_STREAM_TOKEN")
mockEventStoreQuery = &model.EventStoreQuery{
AggregateID: 12,
AggregateVersion: 3,
Expand Down Expand Up @@ -87,7 +89,13 @@ var _ = Describe("QueryUtil", func() {
},
}

qu, err := NewQueryUtil(eventStore, 6, &mock.Logger{})
qu, err := NewQueryUtil(&QueryUtilConfig{
EOSToken: eosToken,
EventStore: eventStore,
BatchSize: 6,
Logger: &mock.Logger{},
ServiceName: "test-service",
})
Expect(err).ToNot(HaveOccurred())
_, err = qu.QueryHandler(mockEventStoreQuery)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -100,7 +108,13 @@ var _ = Describe("QueryUtil", func() {
},
}

qu, err := NewQueryUtil(eventStore, 6, &mock.Logger{})
qu, err := NewQueryUtil(&QueryUtilConfig{
EOSToken: eosToken,
EventStore: eventStore,
BatchSize: 6,
Logger: &mock.Logger{},
ServiceName: "test-service",
})
Expect(err).ToNot(HaveOccurred())
_, err = qu.QueryHandler(mockEventStoreQuery)
Expect(err).To(HaveOccurred())
Expand All @@ -117,7 +131,13 @@ var _ = Describe("QueryUtil", func() {
},
}

qu, err := NewQueryUtil(eventStore, 6, &mock.Logger{})
qu, err := NewQueryUtil(&QueryUtilConfig{
EOSToken: eosToken,
EventStore: eventStore,
BatchSize: 6,
Logger: &mock.Logger{},
ServiceName: "test-service",
})
Expect(err).ToNot(HaveOccurred())
_, err = qu.QueryHandler(mockEventStoreQuery)
Expect(err).To(HaveOccurred())
Expand All @@ -140,11 +160,21 @@ var _ = Describe("QueryUtil", func() {
batchSize, err := strconv.Atoi(batchSizeStr)
Expect(err).ToNot(HaveOccurred())

qu, err := NewQueryUtil(eventStore, batchSize, &mock.Logger{})
qu, err := NewQueryUtil(&QueryUtilConfig{
EOSToken: eosToken,
EventStore: eventStore,
BatchSize: batchSize,
Logger: &mock.Logger{},
ServiceName: "test-service",
})
Expect(err).ToNot(HaveOccurred())
// Listener for batch events (batch events generated below)
var wg sync.WaitGroup
wg.Add(1)

cid, err := uuuid.NewV4()
Expect(err).ToNot(HaveOccurred())

// Here we get the Document from the response channel, unmarshal events array
// from it, and then match the resulting events with mock-events slice
go func() {
Expand Down Expand Up @@ -176,14 +206,18 @@ var _ = Describe("QueryUtil", func() {
}
// All events matched, good stuff
if matchCount == len(mockEvents) {
// Final Event should have EOSToken as its Action
eosEvent := resEvents[len(resEvents)-1]
Expect(eosEvent.Action).To(Equal(eosToken))
Expect(eosEvent.UUID).To(Equal(cid))
close(responseChan)
}
}
wg.Done()
}()

// BatchEvents produced here
docs := qu.CreateBatch(37, uuuid.UUID{}, mockEvents)
docs := qu.CreateBatch(37, cid, mockEvents)
for _, doc := range docs {
responseChan <- doc
}
Expand Down
4 changes: 2 additions & 2 deletions main/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (e *esQueryHandler) ConsumeClaim(
CorrelationID: docCID,
Data: msg.Value,
Error: err.Error(),
ErrorCode: 1,
ErrorCode: model.InternalError,
Source: e.ServiceName,
Topic: esQuery.Topic,
UUID: esQuery.CorrelationID,
Expand All @@ -158,7 +158,7 @@ func (e *esQueryHandler) ConsumeClaim(
CorrelationID: docCID,
Data: msg.Value,
Error: err.Error(),
ErrorCode: 1,
ErrorCode: model.InternalError,
Source: e.ServiceName,
Topic: esQuery.Topic,
UUID: esQuery.CorrelationID,
Expand Down
10 changes: 9 additions & 1 deletion main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func validateEnv() {
"KAFKA_BROKERS",
"KAFKA_CONSUMER_GROUP",
"KAFKA_CONSUMER_TOPIC",
"KAFKA_END_OF_STREAM_TOKEN",
)

if err != nil {
Expand Down Expand Up @@ -214,7 +215,14 @@ func main() {
batchSize = defaultSize
}

queryUtil, err := ioutil.NewQueryUtil(eventStore, batchSize, logger)
eosToken := os.Getenv("KAFKA_END_OF_STREAM_TOKEN")
queryUtil, err := ioutil.NewQueryUtil(&ioutil.QueryUtilConfig{
BatchSize: batchSize,
EOSToken: eosToken,
EventStore: eventStore,
Logger: logger,
ServiceName: serviceName,
})
if err != nil {
err = errors.Wrap(err, "Error creating QueryUtil")
logger.F(tlog.Entry{
Expand Down
23 changes: 18 additions & 5 deletions test/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestEventQuery(t *testing.T) {
"KAFKA_BROKERS",
"KAFKA_CONSUMER_GROUP",
"KAFKA_CONSUMER_TOPIC",
"KAFKA_END_OF_STREAM_TOKEN",
)

if err != nil {
Expand All @@ -66,6 +67,8 @@ var _ = Describe("EventQuery", func() {
responseTopic string

mockEventStoreQuery *model.EventStoreQuery
eventStoreQueryCID uuuid.UUID
eosToken string
)

BeforeSuite(func() {
Expand Down Expand Up @@ -128,15 +131,15 @@ var _ = Describe("EventQuery", func() {
kafkaProducer, err := kafka.NewProducer(config)
Expect(err).ToNot(HaveOccurred())

cid, err := uuuid.NewV4()
eventStoreQueryCID, err = uuuid.NewV4()
Expect(err).ToNot(HaveOccurred())

var aggID int8 = 1
responseTopic = fmt.Sprintf("test.%d", aggID)
mockEventStoreQuery = &model.EventStoreQuery{
AggregateID: aggID,
AggregateVersion: 1,
CorrelationID: cid,
CorrelationID: eventStoreQueryCID,
Topic: responseTopic,
UUID: uuid,
YearBucket: mockEvent.YearBucket,
Expand Down Expand Up @@ -165,10 +168,13 @@ var _ = Describe("EventQuery", func() {

mockEventQueryInput <- kafka.CreateMessage(consumerTopic, testEventQuery)
log.Println("Produced mock-eventstore-query on consumer-topic")

eosToken = os.Getenv(
"KAFKA_END_OF_STREAM_TOKEN")
})
})

Context("An event is produced", func() {
Context("An EventQuery is produced", func() {
var responseConsumer *kafka.Consumer

BeforeEach(func() {
Expand Down Expand Up @@ -214,11 +220,18 @@ var _ = Describe("EventQuery", func() {
Expect(response.Error).To(BeEmpty())
log.Println("Event matched expectation")

// Unmarshal the Result from Kafka-Response
events := []model.Event{}
err = json.Unmarshal(response.Data, &events)
Expect(err).ToNot(HaveOccurred())
return true

// Ensure we get the EOSEvent
for _, event := range events {
if event.Action == eosToken {
log.Println("Event has EOS Action")
Expect(event.UUID).To(Equal(eventStoreQueryCID))
return true
}
}
}
return false
}
Expand Down

0 comments on commit c6c9333

Please sign in to comment.