akka-persistence-postgresql is a journal and snapshot store plugin for akka-persistence using Postgresql. It uses Slick to talk to a Postgresql database.
Scala 2.11 & 2.12, Java8, akka 2.4.x and both slick 3.1.1, 3.2.x are supported.
The plugin supports the following functionality:
-
serialization of events and snapshots to binary (bytea) or json (json/jsonb) format
-
rich tagging of events using hstore format
-
pluggable write-strategies
-
serialization of snapshots to binary (bytea) or json (json/jsonb) format (available since release 0.9.0)
-
transactional event-sourcing
Versions: The table below lists the versions and their main dependencies
Version to use |
Scala 2.11 |
Scala 2.12 |
Scala 2.13 |
Akka |
Slick |
0.5.0 |
✓ |
2.4.x |
3.1.x |
||
0.6.0 |
✓ |
✓ |
2.4.x |
3.2.0 |
|
0.7.0 |
✓ |
✓ |
2.5.x |
3.2.0 |
|
0.8.0 |
✓ |
✓ |
2.5.x |
3.2.1 |
|
0.9.0 |
✓ |
✓ |
2.5.x |
3.2.1 |
|
0.11.0 |
✓ |
✓ |
2.5.x |
3.2.2 |
|
0.13.0 |
✓ |
✓ |
2.5.x |
3.3.0 |
|
0.14.0 |
✓ |
✓ |
✓ |
2.5.x |
3.3.2 |
So you just need to add the following dependency to your SBT dependencies
libraryDependencies += "be.wegenenverkeer" %% "akka-persistence-pg" % version
Add the following to your application.conf
:
akka { persistence { journal.plugin = "pg-journal" snapshot-store.plugin = "pg-snapshot" } } pg-persistence { db { user = "myuser" password = "mypassword" url = "jdbc:postgresql://localhost:5432/mydb" } }
This configures Akka to use the journal and snapshot plugin and configures the database to use for storing the events and snapshots. This is the most basic configuration and it will just use the standard Akka serialization format to store events and snapshots in binary format.
Take a look at the reference.conf for additional configuration options.
The plugin requires two tables to be present in the database, one for the events and one for the snapshots You need to create these tables and indices, either manually or through database migration-, setup scripts.
You also need to install the hstore extension for your database:
CREATE EXTENSION HSTORE; CREATE TABLE journal ( "id" BIGSERIAL NOT NULL PRIMARY KEY, "persistenceid" VARCHAR(254) NOT NULL, "sequencenr" INT NOT NULL, "rowid" BIGINT DEFAULT NULL, "deleted" BOOLEAN DEFAULT false, "payload" BYTEA, "manifest" VARCHAR(512), "uuid" VARCHAR(36) NOT NULL, "writeruuid" VARCHAR(36) NOT NULL, "created" timestamptz NOT NULL, "tags" HSTORE, "event" JSON, constraint "cc_journal_payload_event" check (payload IS NOT NULL OR event IS NOT NULL) ); CREATE UNIQUE INDEX journal_pidseq_idx ON journal (persistenceid, sequencenr); CREATE UNIQUE INDEX journal_rowid_idx ON journal (rowid); CREATE TABLE snapshot ( "persistenceid" VARCHAR(254) NOT NULL, "sequencenr" INT NOT NULL, "timestamp" bigint NOT NULL, "snapshot" BYTEA, "manifest" VARCHAR(512), "json" JSON, CONSTRAINT "cc_snapshot_payload_json" check (snapshot IS NOT NULL OR (json IS NOT NULL AND manifest IS NOT NULL)), PRIMARY KEY (persistenceid, sequencenr) );
Since version 9.4 Postgresql supports storing json, either as using a json or jsonb format. Of course the plugin needs to know how to encode/decode your event/journal entry to/from json, therefore you need to provide and configure an implementation of JsonEncoder:
trait JsonEncoder { /** * A partial function that serializes an event to a json representation * @return the json representation */ def toJson: PartialFunction[Any, JsonString] /** * A partial function that deserializes an event from a json representation * @return the event */ def fromJson: PartialFunction[(JsonString, Class[_]), AnyRef] }
Your JsonEncoder just needs to implement these two partial functions. Check out the TestEventEncoder class for a simple example using the standard library’s included json parser. You can use any json library (play-json, spray-json, circe, …) you want and we recommend using the one you are most comfortable using.
You should also adapt application.conf
so the plugin knows about your encoder:
pg-persistence.eventstore.encoder: "com.example.MyEventEncoder"
Since version 0.9.0 you can also store snapshots as json instead of binary data.
You simply need another implementation of a JsonEncoder
and register it in your application.conf
pg-persistence.snapshotEncoder: "com.example.MySnapshotEncoder"
Decoding, either from json or using standard Akka serialization, might possibly fail. For snapshot deserialization there is an option (since 0.10.0) to ignore these decoding failures. Because snapshots are just an optimization and it should always be possible to reconstruct the state of the PersistentActor through its events, it is safe to do this.
To enable this behaviour adapt application.conf
:
pg-persistence.ignoreSnapshotDecodingFailure = true
You can also tag your events. Most persistence plugins only support a simple form of tagging where tags are just plain strings. But since Postgresql supports the hstore format, this plugin uses more sophisticated tags, where a tag is a key/value pair.
The "default" configured tagger will examine each event and see if it implements the akka.persistence.pg.event.Tagged
trait.
And if it does it will call the tags
method to retrieve the tags associated with the event and store these together with the event.
You can also use your own tagger, by implementing the akka.persistence.pg.event.EventTagger
trait
and configuring the plugin to use it.
The plugin supports the Persistence query API, mostly used in CQRS applications to transform/migrate the events from the write side to the read side.
The ReadJournal is retrieved via the akka.persistence.query.PersistenceQuery` extension:
import akka.persistence.query.PersistenceQuery import akka.persistence.pg.journal.query.PostgresReadJournal val readJournal = PersistenceQuery(system).readJournalFor[PostgresReadJournal](PostgresReadJournal.Identifier)
All queries are live streams and they are not completed when they reaches the end of the currently stored events, but continue to push new events when new events are persisted. The Postgresql write journal is notifying the query side as soon as events are persisted, but for efficiency reasons the query side retrieves the events in batches that sometimes can be delayed up to the configured refresh-interval. The stream is completed with failure if there is a failure in executing the query in the backend journal.
allEvents is used for retrieving all events
Each event stored by the write side gets a unique id assigned by a sequence. You can retrieve a subset of all events by specifying fromRowId and toRowId or use 0L and Long.MaxValue respectively to retrieve all events. Note that the corresponding rowId of each event is provided in the EventEnvelope, which makes it possible to resume the stream at a later point from a given rowId. The returned event stream is ordered by rowId. The same stream of elements (in same order) are returned for multiple executions of the query, except for when events have been deleted.
eventsByPersistenceId is used for retrieving events for a specific PersistentActor identified by its persistenceId
You can retrieve a subset of all events by specifying fromSequenceNr and toSequenceNr or use 0L and Long.MaxValue respectively to retrieve all events. Note that the corresponding sequence number of each event is provided in the EventEnvelope, which makes it possible to resume the stream at a later point from a given sequence number. The returned event stream is ordered by sequence number, i.e. the same order as the PersistentActor persisted the events. The stream of elements (in same order) are returned for multiple executions of the query, except for when events have been deleted.
eventsByTags is used for retrieving events that were marked with a given set of tags
You can retrieve a subset of all events by specifying offset, or use 0L to retrieve all events with a given tag. The offset corresponds to the global id of an event. Note that the corresponding offset of each event is provided in the EventEnvelope, which makes it possible to resume the stream at a later point from a given offset. In addition to the offset the EventEnvelope also provides persistenceId and sequenceNr for each event. The sequenceNr is the sequence number for the persistent actor with the persistenceId that persisted the event. The persistenceId + sequenceNr is an unique identifier for the event.
The returned event stream is ordered by the offset (global id), which corresponds to the same order as the write journal stored the events. The same stream of elements (in same order) are returned for multiple executions of the query.
Each event stored by the journal plugin gets a unique id assigned by a Postgresql sequence. When using a naive approach to storing events, there is a possibility of missing events during event querying.
Imaging the following scenario:
-
Two persistent actors, A en B, each want to store an event (eventA and eventB) using the journal.
-
Two transactions are started almost simulateneously. The first transaction (storing eventA) starts first and gets an id = 1000 from the sequence. The second transaction (storing eventB) gets id = 1001.
-
For some reason however, the second transaction gets committed first. The journal table now has an entry with 1002 as it’s highest entry.
-
The persistence query gets notified and reads this event with id = 1002 from the journal.
-
Now the first transaction commits and it stores the event with id = 1001 in the journal.
-
The persistence query gets notified again, but since it already has seen an event with 1002 it will not see any events with lower ids.
-
You have missed event with id = 1001, unless you query again with a lower starting offset.
In order to prevent this from happening the plugin supports pluggable write strategies.
You can configure the write strategy to use in the application.conf
This strategy takes a write lock on the journal table at the start of the transaction. It effectively serializes each transaction and only a single transaction is executed at a time.
This is the default strategy and although it has a lower throughput than the other strategies it is very easy to use.
This strategy just stores the events in the naive way, but the 'id' column is not used during querying. Instead after each event is stored a notification is sent to a RowIdUpdating actor, which updates a 'rowid' column for all events where it was 'null', using the 'id' column only for determining the ordering. The persistence query plugin will now automatically use the 'rowid' column instead of the 'id' column. It is now simply impossible for events with a lower 'rowid' than the maximum 'rowid' present to appear in the journal after.
This strategy has a better throughput than the TableLockingStrategy, but the latency between storing events and them being available for querying is a bit higher.
This strategy also allows a single transaction to proceed, but it batches multiple events in a single transaction. It achieves high throughput, but since it changes the transaction boundary, it is not always the best recommended strategy.