diff --git a/src/event-store.ts b/src/event-store.ts index 6d2fcad..c5658e7 100644 --- a/src/event-store.ts +++ b/src/event-store.ts @@ -98,7 +98,8 @@ export interface EventStream { /** * Add a new event to the end of the event stream. * @param data The event data + * @param type The Event type * @return The event, updated with informations like its sequence order and commitTimestamp */ - addEvent(data: any): Promise; + addEvent(data: any, type?: string): Promise; } diff --git a/src/event-stream.ts b/src/event-stream.ts index cc0a944..8bb25c3 100644 --- a/src/event-stream.ts +++ b/src/event-stream.ts @@ -44,10 +44,11 @@ export class EventStreamImpl implements EventStream { /** * Add a new event to the end of the event stream. * @param data The event data + * @param type The Event type * @return The event, updated with informations like its sequence order and commitTimestamp */ - public async addEvent(data: any) { - const addedEvent: Event = await this.getProvider().addEvent(this.stream, data); + public async addEvent(data: any, type?: string) { + const addedEvent: Event = await this.getProvider().addEvent(this.stream, data, type); if (this.eventStore.publisher) { await (this.eventStore.publisher as Publisher).publish({ event: addedEvent, diff --git a/src/model/event.ts b/src/model/event.ts index f5efe99..b9932ba 100644 --- a/src/model/event.ts +++ b/src/model/event.ts @@ -14,4 +14,8 @@ export interface Event { * The sequence order for the event in the {@link EventStream} */ sequence?: number; + /** + * The Event type + */ + type?: string; } diff --git a/src/provider/memory.ts b/src/provider/memory.ts index 3e539ce..e4a9548 100644 --- a/src/provider/memory.ts +++ b/src/provider/memory.ts @@ -10,12 +10,13 @@ import { PersistenceProvider } from './provider'; export class InMemoryProvider implements PersistenceProvider { private store: Map>> = new Map(); - public async addEvent(stream: Stream, data: any) { + public async addEvent(stream: Stream, data: any, type = '') { const currentEvents = await this.getEventsList(stream.aggregation, stream.id); const event: Event = { commitTimestamp: new Date().getTime(), payload: data, - sequence: currentEvents.length + sequence: currentEvents.length, + type: type }; currentEvents.push(event); return event; diff --git a/src/provider/mongo.ts b/src/provider/mongo.ts index 4f6e11b..95b7bab 100644 --- a/src/provider/mongo.ts +++ b/src/provider/mongo.ts @@ -17,14 +17,15 @@ export class MongoProvider implements PersistenceProvider { this.mongoURL = url; } - public async addEvent(stream: Stream, data: any) { + public async addEvent(stream: Stream, data: any, type = '') { const events = await this.events(); const sequence = await this.getNextSequenceValue(this.getKey(stream.aggregation, stream.id)) - 1; const commitTimestamp = new Date().getTime(); const event: Event = { commitTimestamp: commitTimestamp, payload: data, - sequence: sequence + sequence: sequence, + type: type }; const result = await events.insertOne(_.merge(event, { stream: stream })); diff --git a/src/provider/mysql.ts b/src/provider/mysql.ts index bb97a74..ae49ca1 100644 --- a/src/provider/mysql.ts +++ b/src/provider/mysql.ts @@ -15,19 +15,20 @@ export class MySQLProvider implements PersistenceProvider { this.mysql = new MySQL(config); } - public async addEvent(stream: Stream, data: any) { + public async addEvent(stream: Stream, data: any, type = '') { await this.ensureTables(); - let result = await this.mysql.query('INSERT INTO events(streamId, aggregation, payload, sequence) ' + - 'SELECT ?,?,?,COUNT(*) FROM events ' + + let result = await this.mysql.query('INSERT INTO events(streamId, aggregation, payload, type, sequence) ' + + 'SELECT ?,?,?,?,COUNT(*) FROM events ' + 'WHERE streamId = ? AND aggregation = ?', - [stream.id, stream.aggregation, JSON.stringify(data), stream.id, stream.aggregation]); + [stream.id, stream.aggregation, JSON.stringify(data), type, stream.id, stream.aggregation]); result = await this.mysql.query('SELECT sequence, commitTimestamp FROM events WHERE id=?', [result.insertId]); const event: Event = { commitTimestamp: result.commitTimestamp, payload: data, - sequence: result.sequence + sequence: result.sequence, + type: type }; return event; } @@ -43,7 +44,8 @@ export class MySQLProvider implements PersistenceProvider { return { commitTimestamp: data.commitTimestamp, payload: JSON.parse(data.payload), - sequence: data.sequence + sequence: data.sequence, + type: data.type }; }); } @@ -79,6 +81,7 @@ export class MySQLProvider implements PersistenceProvider { + 'streamId VARCHAR(40) NOT NULL,' + 'aggregation VARCHAR(40) NOT NULL,' + 'payload TEXT,' + + 'type VARCHAR(40),' + 'sequence INT,' + 'commitTimestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,' + 'PRIMARY KEY (id),' diff --git a/src/provider/provider.ts b/src/provider/provider.ts index 47ccc38..cbf388d 100644 --- a/src/provider/provider.ts +++ b/src/provider/provider.ts @@ -11,9 +11,10 @@ export interface PersistenceProvider { * Add a new {@link Event} in the {@link EventStream} * @param stream The associated stream * @param data The Event data + * @param type The Event type * @return The updated event, after persisted. */ - addEvent(stream: Stream, data: any): Promise; + addEvent(stream: Stream, data: any, type?: string): Promise; /** * Retrieves a ranged list of events in the {@link EventStream} diff --git a/src/provider/redis.ts b/src/provider/redis.ts index 7b345a9..33aaafe 100644 --- a/src/provider/redis.ts +++ b/src/provider/redis.ts @@ -15,14 +15,15 @@ export class RedisProvider implements PersistenceProvider { this.redis = RedisFactory.createClient(config); } - public async addEvent(stream: Stream, data: any) { + public async addEvent(stream: Stream, data: any, type = '') { const sequence = await this.redis.incr(`sequences:{${this.getKey(stream.aggregation, stream.id)}}`) - 1; const time = await this.redis.time(); const commitTimestamp = parseInt(time, 10); const event: Event = { commitTimestamp: commitTimestamp, payload: data, - sequence: sequence + sequence: sequence, + type: type }; await this.redis.multi() .rpush(this.getKey(stream.aggregation, stream.id), JSON.stringify(event)) diff --git a/test/unit/provider/memory-provider.spec.ts b/test/unit/provider/memory-provider.spec.ts index 00f3414..35999d5 100644 --- a/test/unit/provider/memory-provider.spec.ts +++ b/test/unit/provider/memory-provider.spec.ts @@ -20,12 +20,14 @@ describe('EventStory Memory Provider', () => { }); it('should be able to read events from the EventStream', async () => { - await ordersStream.addEvent(EVENT_PAYLOAD); + await ordersStream.addEvent(EVENT_PAYLOAD, 'evtType'); await ordersStream.addEvent(EVENT_PAYLOAD + '_1'); const events = await ordersStream.getEvents(); expect(events.length).toEqual(2); expect(events[0].payload).toEqual(EVENT_PAYLOAD); expect(events[0].sequence).toEqual(0); + expect(events[0].type).toEqual('evtType'); + expect(events[1].type).toEqual(''); }); it('should be able to get event ranged list from the event stream', async () => { diff --git a/test/unit/provider/mongo-provider.spec.ts b/test/unit/provider/mongo-provider.spec.ts index 6c87c97..ade3c99 100644 --- a/test/unit/provider/mongo-provider.spec.ts +++ b/test/unit/provider/mongo-provider.spec.ts @@ -168,7 +168,8 @@ describe('EventStory Mongo Provider', () => { const mongoURL = 'mongodb://localhost:27017/eventstore'; const mongoProvider = new MongoProvider(mongoURL); - const event = await mongoProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD'); + const type = 'evtType'; + const event = await mongoProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD', type); expect(mongoClientConnectMock).toBeCalledTimes(1); expect(mongoClientConnectMock).toBeCalledWith(mongoURL, { useNewUrlParser: true }); @@ -186,6 +187,7 @@ describe('EventStory Mongo Provider', () => { }); expect(collectionMock.insertOne).toBeCalledWith( expect.objectContaining({ + type: type, commitTimestamp: expect.anything(), payload: 'EVENT PAYLOAD', sequence: 0, @@ -252,7 +254,6 @@ describe('EventStory Mongo Provider', () => { sequence: 0, stream: expect.objectContaining({ aggregation: 'orders', id: '1' }) })); - // expect(event.commitTimestamp).toEqual(1); }); it('should only initiate collections once', async () => { @@ -271,4 +272,22 @@ describe('EventStory Mongo Provider', () => { expect(dbMock.collection).toBeCalledWith('counters'); }); + it('should use empty as default event type', async () => { + // eslint-disable-next-line @typescript-eslint/camelcase + collectionMock.findOneAndUpdate.mockResolvedValue({ value: { sequence_value: 1 }, ok: true }); + collectionMock.insertOne.mockResolvedValue({ result: { ok: true } }); + + const mongoProvider = new MongoProvider('mongodb://localhost:27017/eventstore'); + await mongoProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD'); + + expect(collectionMock.insertOne).toBeCalledWith( + expect.objectContaining({ + type: '', + commitTimestamp: expect.anything(), + payload: 'EVENT PAYLOAD', + sequence: 0, + stream: expect.objectContaining({ aggregation: 'orders', id: '1' }) + })); + }); + }); diff --git a/test/unit/provider/mysql-provider.spec.ts b/test/unit/provider/mysql-provider.spec.ts index 0460a75..141feeb 100644 --- a/test/unit/provider/mysql-provider.spec.ts +++ b/test/unit/provider/mysql-provider.spec.ts @@ -13,6 +13,7 @@ const ensureTablesSQL = 'CREATE TABLE IF NOT EXISTS events (' + 'streamId VARCHAR(40) NOT NULL,' + 'aggregation VARCHAR(40) NOT NULL,' + 'payload TEXT,' + + 'type VARCHAR(40),' + 'sequence INT,' + 'commitTimestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,' + 'PRIMARY KEY (id),' @@ -156,19 +157,43 @@ describe('EventStory Redis Provider', () => { mySqlMock.query.mockResolvedValueOnce({ insertId: '1234' }); mySqlMock.query.mockResolvedValueOnce({ commitTimestamp: 'timestamp', sequence: 111 }); + const type = 'evtType'; + const mySQLProvider = new MySQLProvider({}); + const event = await mySQLProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD', type); + expect(mySqlMock.query).toBeCalledWith(ensureTablesSQL); + expect(mySqlMock.query).toBeCalledWith('INSERT INTO events(streamId, aggregation, payload, type, sequence) ' + + 'SELECT ?,?,?,?,COUNT(*) FROM events ' + + 'WHERE streamId = ? AND aggregation = ?', + ['1', 'orders', JSON.stringify('EVENT PAYLOAD'), type, '1', 'orders']); + expect(mySqlMock.query).toBeCalledWith('SELECT sequence, commitTimestamp FROM events WHERE id=?', + ['1234']); + expect(event).toEqual({ + commitTimestamp: 'timestamp', + payload: 'EVENT PAYLOAD', + sequence: 111, + type: type + }); + }); + + it('should use empty as default event type', async () => { + mySqlMock.query.mockResolvedValueOnce({}); + mySqlMock.query.mockResolvedValueOnce({ insertId: '1234' }); + mySqlMock.query.mockResolvedValueOnce({ commitTimestamp: 'timestamp', sequence: 111 }); + const mySQLProvider = new MySQLProvider({}); const event = await mySQLProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD'); expect(mySqlMock.query).toBeCalledWith(ensureTablesSQL); - expect(mySqlMock.query).toBeCalledWith('INSERT INTO events(streamId, aggregation, payload, sequence) ' + - 'SELECT ?,?,?,COUNT(*) FROM events ' + + expect(mySqlMock.query).toBeCalledWith('INSERT INTO events(streamId, aggregation, payload, type, sequence) ' + + 'SELECT ?,?,?,?,COUNT(*) FROM events ' + 'WHERE streamId = ? AND aggregation = ?', - ['1', 'orders', JSON.stringify('EVENT PAYLOAD'), '1', 'orders']); + ['1', 'orders', JSON.stringify('EVENT PAYLOAD'), '', '1', 'orders']); expect(mySqlMock.query).toBeCalledWith('SELECT sequence, commitTimestamp FROM events WHERE id=?', ['1234']); expect(event).toEqual({ commitTimestamp: 'timestamp', payload: 'EVENT PAYLOAD', - sequence: 111 + sequence: 111, + type: '' }); }); }); diff --git a/test/unit/provider/redis-provider.spec.ts b/test/unit/provider/redis-provider.spec.ts index 77d94b5..3e27a38 100644 --- a/test/unit/provider/redis-provider.spec.ts +++ b/test/unit/provider/redis-provider.spec.ts @@ -99,12 +99,34 @@ describe('EventStory Redis Provider', () => { redisMock.rpush.mockReturnValue(redisMock); redisMock.zadd.mockReturnValue(redisMock); + const type = 'evtType'; + const redisProvider = new RedisProvider({ standalone: { host: 'localhost' } }); + const event = await redisProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD', type); + expect(redisMock.incr).toBeCalledWith('sequences:{orders:1}'); + expect(redisMock.time).toBeCalledTimes(1); + expect(redisMock.multi).toBeCalledTimes(1); + expect(redisMock.rpush).toBeCalledWith('orders:1', '{"commitTimestamp":1,"payload":"EVENT PAYLOAD","sequence":0,"type":"evtType"}'); + expect(redisMock.zadd).toBeCalledTimes(2); + expect(redisMock.zadd).toBeCalledWith('meta:aggregations', '1', 'orders'); + expect(redisMock.zadd).toBeCalledWith('meta:aggregations:orders', '1', '1'); + expect(redisMock.exec).toBeCalledTimes(1); + expect(event.sequence).toEqual(0); + expect(event.commitTimestamp).toEqual(1); + }); + + it('should use empty as default event type', async () => { + redisMock.incr.mockResolvedValue(1); + redisMock.time.mockResolvedValue(1); + redisMock.multi.mockReturnValue(redisMock); + redisMock.rpush.mockReturnValue(redisMock); + redisMock.zadd.mockReturnValue(redisMock); + const redisProvider = new RedisProvider({ standalone: { host: 'localhost' } }); const event = await redisProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD'); expect(redisMock.incr).toBeCalledWith('sequences:{orders:1}'); expect(redisMock.time).toBeCalledTimes(1); expect(redisMock.multi).toBeCalledTimes(1); - expect(redisMock.rpush).toBeCalledWith('orders:1', '{"commitTimestamp":1,"payload":"EVENT PAYLOAD","sequence":0}'); + expect(redisMock.rpush).toBeCalledWith('orders:1', '{"commitTimestamp":1,"payload":"EVENT PAYLOAD","sequence":0,"type":""}'); expect(redisMock.zadd).toBeCalledTimes(2); expect(redisMock.zadd).toBeCalledWith('meta:aggregations', '1', 'orders'); expect(redisMock.zadd).toBeCalledWith('meta:aggregations:orders', '1', '1');