Type-safe Event Sourcing and CQRS with Node.JS and TypeScript
Note: createDomain
will be migrating to createDomainV2
in version 11.x
The createDomainV2
API solves circular reference issues when importing aggregates.
The original createDomain
will be available as createDomainV1
from 11.x onwards.
I reguarly use event sourcing and wanted to lower the barrier for entry and increase productivity for colleagues.
The design goals were:
- Provide as much type safety and inference as possible
- Make creating domains quick and intuitive
- Be easy to test
- Allow developers to focus on application/business problems instead of Event Sourcing and CQRS problems
To obtain these goals the design is highly opinionated, but still flexible.
See Providers for more details and examples
- Postgres using Postgres.js
- Postgres using node-postgres
- SQLite, MySQL, Postgres using Knex
- In-memory
- MongoDB
- Neo4j v3.5
- Neo4j v4
See the documentation regarding information about aggregate persistence. This refers to persisting a copy of the aggregate on events for performant retrieval.
EvtStore is type-driven to take advantage of type safety and auto completion. We front-load the creation of our Event
, Aggregate
, and Command
types to avoid having to repeatedly import and pass them as generic argument. EvtStore makes use for TypeScript's mapped types and conditional types to achieve this.
type UserEvt =
| { type: 'created', name: string }
| { type: 'disabled' }
| { type: 'enabled' }
type UserAgg = { name: string, enabled: boolean }
type UserCmd =
| { type: 'create': name: string }
| { type: 'enable' }
| { type: 'disable' }
type PostEvt =
| { type: 'postCreated', userId: string, content: string }
| { type: 'postArchived' }
type PostAgg = { userId: string, content: string, archived: boolean }
type PostCmd =
| { type: 'createPost', userId: string, content: string }
| { type: 'archivedPost', userId: string }
const user = createAggregate<UserEvt, UserAgg, 'users'>({
stream: 'users',
create: () => ({ name: '', enabled: false }),
fold: (evt) => {
switch (evt.type) {
case 'created':
return { name: evt.name, enabled: true }
case 'disabled':
return { enabled: false }
case 'enabled':
return { enabled: true }
}
}
})
const post = createAggregate<PostEvt, PostAgg, 'posts'>({
stream: 'posts',
create: () => ({ content: '', userId: '', archived: false }),
fold: (evt) => {
switch (evt.type) {
case 'postCreated':
return { userId: evt.userId, content: evt.content }
case 'postArchived':
return { archived: true }
},
}
})
const provider = createProvider()
export const { domain, createHandler } = createDomain({ provider }, { user, post })
export const userCmd = createCommands<UserEvt, UserEvt, UserCmd>(domain.user, {
async create(cmd, agg) { ... },
async disable(cmd, agg) { ... },
async enable(cmd, agg) { ... },
})
export const postCmd = createCommands<PostEvt, PostAgg, PostCmd>(domain.post, {
async createPost(cmd, agg) {
if (agg.version) throw new CommandError('Post already exists')
const user = await domain.user.getAggregate(cmd.userId)
if (!user.version) throw new CommandError('Unauthorized')
return { type: 'postCreated', content: cmd.content, userId: cmd.userId }
},
async archivePost(cmd, agg) {
if (cmd.userId !== agg.userId) throw new CommandError('Not allowed')
if (agg.archived) return
return { type: 'postArchived' }
}
})
const postModel = createHandler('posts-model', ['posts'], {
// When the event handler is started for the first time, the handler will begin at the end of the stream(s) history
tailStream: false,
// Every time the event handler is started, the handler will begin at the end of the stream(s) history
alwaysTailStream: false,
// Skip events that throw an error when being handled
continueOnError: false,
})
postModel.handle('posts', 'postCreated', async (id, event, meta) => {
// Insert into database
})
postModel.start()
See API