Skip to content

Commit

Permalink
add a type property for event
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagobustamante committed Jun 12, 2020
1 parent c9e7f1f commit 8914594
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 24 deletions.
3 changes: 2 additions & 1 deletion src/event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ export interface EventStream {
/**
* Add a new event to the end of the event stream.
* @param data The event data
* @param type The Event type
* @return The event, updated with informations like its sequence order and commitTimestamp
*/
addEvent(data: any): Promise<Event>;
addEvent(data: any, type?: string): Promise<Event>;
}
5 changes: 3 additions & 2 deletions src/event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ export class EventStreamImpl implements EventStream {
/**
* Add a new event to the end of the event stream.
* @param data The event data
* @param type The Event type
* @return The event, updated with informations like its sequence order and commitTimestamp
*/
public async addEvent(data: any) {
const addedEvent: Event = await this.getProvider().addEvent(this.stream, data);
public async addEvent(data: any, type?: string) {
const addedEvent: Event = await this.getProvider().addEvent(this.stream, data, type);
if (this.eventStore.publisher) {
await (this.eventStore.publisher as Publisher).publish({
event: addedEvent,
Expand Down
4 changes: 4 additions & 0 deletions src/model/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ export interface Event {
* The sequence order for the event in the {@link EventStream}
*/
sequence?: number;
/**
* The Event type
*/
type?: string;
}
5 changes: 3 additions & 2 deletions src/provider/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import { PersistenceProvider } from './provider';
export class InMemoryProvider implements PersistenceProvider {
private store: Map<string, Map<string, Array<Event>>> = new Map();

public async addEvent(stream: Stream, data: any) {
public async addEvent(stream: Stream, data: any, type = '') {
const currentEvents = await this.getEventsList(stream.aggregation, stream.id);
const event: Event = {
commitTimestamp: new Date().getTime(),
payload: data,
sequence: currentEvents.length
sequence: currentEvents.length,
type: type
};
currentEvents.push(event);
return event;
Expand Down
5 changes: 3 additions & 2 deletions src/provider/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ export class MongoProvider implements PersistenceProvider {
this.mongoURL = url;
}

public async addEvent(stream: Stream, data: any) {
public async addEvent(stream: Stream, data: any, type = '') {
const events = await this.events();
const sequence = await this.getNextSequenceValue(this.getKey(stream.aggregation, stream.id)) - 1;
const commitTimestamp = new Date().getTime();
const event: Event = {
commitTimestamp: commitTimestamp,
payload: data,
sequence: sequence
sequence: sequence,
type: type
};

const result = await events.insertOne(_.merge(event, { stream: stream }));
Expand Down
15 changes: 9 additions & 6 deletions src/provider/mysql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ export class MySQLProvider implements PersistenceProvider {
this.mysql = new MySQL(config);
}

public async addEvent(stream: Stream, data: any) {
public async addEvent(stream: Stream, data: any, type = '') {
await this.ensureTables();
let result = await this.mysql.query('INSERT INTO events(streamId, aggregation, payload, sequence) ' +
'SELECT ?,?,?,COUNT(*) FROM events ' +
let result = await this.mysql.query('INSERT INTO events(streamId, aggregation, payload, type, sequence) ' +
'SELECT ?,?,?,?,COUNT(*) FROM events ' +
'WHERE streamId = ? AND aggregation = ?',
[stream.id, stream.aggregation, JSON.stringify(data), stream.id, stream.aggregation]);
[stream.id, stream.aggregation, JSON.stringify(data), type, stream.id, stream.aggregation]);

result = await this.mysql.query('SELECT sequence, commitTimestamp FROM events WHERE id=?', [result.insertId]);

const event: Event = {
commitTimestamp: result.commitTimestamp,
payload: data,
sequence: result.sequence
sequence: result.sequence,
type: type
};
return event;
}
Expand All @@ -43,7 +44,8 @@ export class MySQLProvider implements PersistenceProvider {
return {
commitTimestamp: data.commitTimestamp,
payload: JSON.parse(data.payload),
sequence: data.sequence
sequence: data.sequence,
type: data.type
};
});
}
Expand Down Expand Up @@ -79,6 +81,7 @@ export class MySQLProvider implements PersistenceProvider {
+ 'streamId VARCHAR(40) NOT NULL,'
+ 'aggregation VARCHAR(40) NOT NULL,'
+ 'payload TEXT,'
+ 'type VARCHAR(40),'
+ 'sequence INT,'
+ 'commitTimestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,'
+ 'PRIMARY KEY (id),'
Expand Down
3 changes: 2 additions & 1 deletion src/provider/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ export interface PersistenceProvider {
* Add a new {@link Event} in the {@link EventStream}
* @param stream The associated stream
* @param data The Event data
* @param type The Event type
* @return The updated event, after persisted.
*/
addEvent(stream: Stream, data: any): Promise<Event>;
addEvent(stream: Stream, data: any, type?: string): Promise<Event>;

/**
* Retrieves a ranged list of events in the {@link EventStream}
Expand Down
5 changes: 3 additions & 2 deletions src/provider/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ export class RedisProvider implements PersistenceProvider {
this.redis = RedisFactory.createClient(config);
}

public async addEvent(stream: Stream, data: any) {
public async addEvent(stream: Stream, data: any, type = '') {
const sequence = await this.redis.incr(`sequences:{${this.getKey(stream.aggregation, stream.id)}}`) - 1;
const time = await this.redis.time();
const commitTimestamp = parseInt(time, 10);
const event: Event = {
commitTimestamp: commitTimestamp,
payload: data,
sequence: sequence
sequence: sequence,
type: type
};
await this.redis.multi()
.rpush(this.getKey(stream.aggregation, stream.id), JSON.stringify(event))
Expand Down
4 changes: 3 additions & 1 deletion test/unit/provider/memory-provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ describe('EventStory Memory Provider', () => {
});

it('should be able to read events from the EventStream', async () => {
await ordersStream.addEvent(EVENT_PAYLOAD);
await ordersStream.addEvent(EVENT_PAYLOAD, 'evtType');
await ordersStream.addEvent(EVENT_PAYLOAD + '_1');
const events = await ordersStream.getEvents();
expect(events.length).toEqual(2);
expect(events[0].payload).toEqual(EVENT_PAYLOAD);
expect(events[0].sequence).toEqual(0);
expect(events[0].type).toEqual('evtType');
expect(events[1].type).toEqual('');
});

it('should be able to get event ranged list from the event stream', async () => {
Expand Down
23 changes: 21 additions & 2 deletions test/unit/provider/mongo-provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ describe('EventStory Mongo Provider', () => {

const mongoURL = 'mongodb://localhost:27017/eventstore';
const mongoProvider = new MongoProvider(mongoURL);
const event = await mongoProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD');
const type = 'evtType';
const event = await mongoProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD', type);

expect(mongoClientConnectMock).toBeCalledTimes(1);
expect(mongoClientConnectMock).toBeCalledWith(mongoURL, { useNewUrlParser: true });
Expand All @@ -186,6 +187,7 @@ describe('EventStory Mongo Provider', () => {
});
expect(collectionMock.insertOne).toBeCalledWith(
expect.objectContaining({
type: type,
commitTimestamp: expect.anything(),
payload: 'EVENT PAYLOAD',
sequence: 0,
Expand Down Expand Up @@ -252,7 +254,6 @@ describe('EventStory Mongo Provider', () => {
sequence: 0,
stream: expect.objectContaining({ aggregation: 'orders', id: '1' })
}));
// expect(event.commitTimestamp).toEqual(1);
});

it('should only initiate collections once', async () => {
Expand All @@ -271,4 +272,22 @@ describe('EventStory Mongo Provider', () => {
expect(dbMock.collection).toBeCalledWith('counters');
});

it('should use empty as default event type', async () => {
// eslint-disable-next-line @typescript-eslint/camelcase
collectionMock.findOneAndUpdate.mockResolvedValue({ value: { sequence_value: 1 }, ok: true });
collectionMock.insertOne.mockResolvedValue({ result: { ok: true } });

const mongoProvider = new MongoProvider('mongodb://localhost:27017/eventstore');
await mongoProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD');

expect(collectionMock.insertOne).toBeCalledWith(
expect.objectContaining({
type: '',
commitTimestamp: expect.anything(),
payload: 'EVENT PAYLOAD',
sequence: 0,
stream: expect.objectContaining({ aggregation: 'orders', id: '1' })
}));
});

});
33 changes: 29 additions & 4 deletions test/unit/provider/mysql-provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const ensureTablesSQL = 'CREATE TABLE IF NOT EXISTS events ('
+ 'streamId VARCHAR(40) NOT NULL,'
+ 'aggregation VARCHAR(40) NOT NULL,'
+ 'payload TEXT,'
+ 'type VARCHAR(40),'
+ 'sequence INT,'
+ 'commitTimestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,'
+ 'PRIMARY KEY (id),'
Expand Down Expand Up @@ -156,19 +157,43 @@ describe('EventStory Redis Provider', () => {
mySqlMock.query.mockResolvedValueOnce({ insertId: '1234' });
mySqlMock.query.mockResolvedValueOnce({ commitTimestamp: 'timestamp', sequence: 111 });

const type = 'evtType';
const mySQLProvider = new MySQLProvider({});
const event = await mySQLProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD', type);
expect(mySqlMock.query).toBeCalledWith(ensureTablesSQL);
expect(mySqlMock.query).toBeCalledWith('INSERT INTO events(streamId, aggregation, payload, type, sequence) ' +
'SELECT ?,?,?,?,COUNT(*) FROM events ' +
'WHERE streamId = ? AND aggregation = ?',
['1', 'orders', JSON.stringify('EVENT PAYLOAD'), type, '1', 'orders']);
expect(mySqlMock.query).toBeCalledWith('SELECT sequence, commitTimestamp FROM events WHERE id=?',
['1234']);
expect(event).toEqual({
commitTimestamp: 'timestamp',
payload: 'EVENT PAYLOAD',
sequence: 111,
type: type
});
});

it('should use empty as default event type', async () => {
mySqlMock.query.mockResolvedValueOnce({});
mySqlMock.query.mockResolvedValueOnce({ insertId: '1234' });
mySqlMock.query.mockResolvedValueOnce({ commitTimestamp: 'timestamp', sequence: 111 });

const mySQLProvider = new MySQLProvider({});
const event = await mySQLProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD');
expect(mySqlMock.query).toBeCalledWith(ensureTablesSQL);
expect(mySqlMock.query).toBeCalledWith('INSERT INTO events(streamId, aggregation, payload, sequence) ' +
'SELECT ?,?,?,COUNT(*) FROM events ' +
expect(mySqlMock.query).toBeCalledWith('INSERT INTO events(streamId, aggregation, payload, type, sequence) ' +
'SELECT ?,?,?,?,COUNT(*) FROM events ' +
'WHERE streamId = ? AND aggregation = ?',
['1', 'orders', JSON.stringify('EVENT PAYLOAD'), '1', 'orders']);
['1', 'orders', JSON.stringify('EVENT PAYLOAD'), '', '1', 'orders']);
expect(mySqlMock.query).toBeCalledWith('SELECT sequence, commitTimestamp FROM events WHERE id=?',
['1234']);
expect(event).toEqual({
commitTimestamp: 'timestamp',
payload: 'EVENT PAYLOAD',
sequence: 111
sequence: 111,
type: ''
});
});
});
24 changes: 23 additions & 1 deletion test/unit/provider/redis-provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,34 @@ describe('EventStory Redis Provider', () => {
redisMock.rpush.mockReturnValue(redisMock);
redisMock.zadd.mockReturnValue(redisMock);

const type = 'evtType';
const redisProvider = new RedisProvider({ standalone: { host: 'localhost' } });
const event = await redisProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD', type);
expect(redisMock.incr).toBeCalledWith('sequences:{orders:1}');
expect(redisMock.time).toBeCalledTimes(1);
expect(redisMock.multi).toBeCalledTimes(1);
expect(redisMock.rpush).toBeCalledWith('orders:1', '{"commitTimestamp":1,"payload":"EVENT PAYLOAD","sequence":0,"type":"evtType"}');
expect(redisMock.zadd).toBeCalledTimes(2);
expect(redisMock.zadd).toBeCalledWith('meta:aggregations', '1', 'orders');
expect(redisMock.zadd).toBeCalledWith('meta:aggregations:orders', '1', '1');
expect(redisMock.exec).toBeCalledTimes(1);
expect(event.sequence).toEqual(0);
expect(event.commitTimestamp).toEqual(1);
});

it('should use empty as default event type', async () => {
redisMock.incr.mockResolvedValue(1);
redisMock.time.mockResolvedValue(1);
redisMock.multi.mockReturnValue(redisMock);
redisMock.rpush.mockReturnValue(redisMock);
redisMock.zadd.mockReturnValue(redisMock);

const redisProvider = new RedisProvider({ standalone: { host: 'localhost' } });
const event = await redisProvider.addEvent({ aggregation: 'orders', id: '1' }, 'EVENT PAYLOAD');
expect(redisMock.incr).toBeCalledWith('sequences:{orders:1}');
expect(redisMock.time).toBeCalledTimes(1);
expect(redisMock.multi).toBeCalledTimes(1);
expect(redisMock.rpush).toBeCalledWith('orders:1', '{"commitTimestamp":1,"payload":"EVENT PAYLOAD","sequence":0}');
expect(redisMock.rpush).toBeCalledWith('orders:1', '{"commitTimestamp":1,"payload":"EVENT PAYLOAD","sequence":0,"type":""}');
expect(redisMock.zadd).toBeCalledTimes(2);
expect(redisMock.zadd).toBeCalledWith('meta:aggregations', '1', 'orders');
expect(redisMock.zadd).toBeCalledWith('meta:aggregations:orders', '1', '1');
Expand Down

0 comments on commit 8914594

Please sign in to comment.