Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify Repository and Serde traits, introduce AnyRepository, remove ProtobufSerde #270

Merged
merged 4 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 11 additions & 26 deletions eventually-postgres/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@

use async_trait::async_trait;
use eventually::aggregate::Aggregate;
use eventually::serde::{Deserializer, Serde, Serializer};
use eventually::serde::Serde;
use eventually::version::Version;
use eventually::{aggregate, version};
use sqlx::{PgPool, Postgres, Row};

#[derive(Debug, Clone)]
pub struct Repository<T, OutT, OutEvt, TSerde, EvtSerde>

Check warning on line 11 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a struct

warning: missing documentation for a struct --> eventually-postgres/src/aggregate.rs:11:1 | 11 | pub struct Repository<T, OutT, OutEvt, TSerde, EvtSerde> | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
where
T: Aggregate,
<T as Aggregate>::Id: ToString,
OutT: From<T>,
OutEvt: From<T::Event>,
TSerde: Serde<OutT>,
EvtSerde: Serializer<OutEvt>,
EvtSerde: Serde<OutEvt>,
{
pool: PgPool,
aggregate_serde: TSerde,
Expand All @@ -32,13 +32,13 @@
OutT: From<T>,
OutEvt: From<T::Event>,
TSerde: Serde<OutT>,
EvtSerde: Serializer<OutEvt>,
EvtSerde: Serde<OutEvt>,
{
pub async fn new(
pool: PgPool,
aggregate_serde: TSerde,
event_serde: EvtSerde,
) -> Result<Self, sqlx::migrate::MigrateError> {

Check warning on line 41 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for an associated function

warning: missing documentation for an associated function --> eventually-postgres/src/aggregate.rs:37:5 | 37 | / pub async fn new( 38 | | pool: PgPool, 39 | | aggregate_serde: TSerde, 40 | | event_serde: EvtSerde, 41 | | ) -> Result<Self, sqlx::migrate::MigrateError> { | |__________________________________________________^
// Make sure the latest migrations are used before using the Repository instance.
crate::MIGRATIONS.run(&pool).await?;

Expand All @@ -54,40 +54,40 @@
}

#[derive(Debug, thiserror::Error)]
pub enum GetError {

Check warning on line 57 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for an enum

warning: missing documentation for an enum --> eventually-postgres/src/aggregate.rs:57:1 | 57 | pub enum GetError { | ^^^^^^^^^^^^^^^^^
#[error("failed to fetch the aggregate state row: {0}")]
FetchAggregateRow(#[source] sqlx::Error),

Check warning on line 59 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:59:5 | 59 | FetchAggregateRow(#[source] sqlx::Error), | ^^^^^^^^^^^^^^^^^
#[error("failed to deserialize the aggregate state from the database row: {0}")]
DeserializeAggregate(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),

Check warning on line 61 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:61:5 | 61 | DeserializeAggregate(#[source] Box<dyn std::error::Error + Send + Sync + 'static>), | ^^^^^^^^^^^^^^^^^^^^
#[error("failed to convert the aggregate state into its domain type: {0}")]
ConvertAggregate(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),

Check warning on line 63 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:63:5 | 63 | ConvertAggregate(#[source] Box<dyn std::error::Error + Send + Sync + 'static>), | ^^^^^^^^^^^^^^^^
#[error("database returned an error: {0}")]
Database(#[from] sqlx::Error),

Check warning on line 65 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:65:5 | 65 | Database(#[from] sqlx::Error), | ^^^^^^^^
}

#[derive(Debug, thiserror::Error)]
pub enum SaveError {

Check warning on line 69 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for an enum

warning: missing documentation for an enum --> eventually-postgres/src/aggregate.rs:69:1 | 69 | pub enum SaveError { | ^^^^^^^^^^^^^^^^^^
#[error("failed to begin a new transaction: {0}")]
BeginTransaction(#[source] sqlx::Error),

Check warning on line 71 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:71:5 | 71 | BeginTransaction(#[source] sqlx::Error), | ^^^^^^^^^^^^^^^^
#[error("conflict error detected: {0})")]
Conflict(#[source] version::ConflictError),

Check warning on line 73 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:73:5 | 73 | Conflict(#[source] version::ConflictError), | ^^^^^^^^
#[error("concurrent update detected, represented as a conflict error: {0})")]
Concurrency(#[source] version::ConflictError),

Check warning on line 75 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:75:5 | 75 | Concurrency(#[source] version::ConflictError), | ^^^^^^^^^^^
#[error("failed to save the new aggregate state: {0}")]
SaveAggregateState(#[source] sqlx::Error),

Check warning on line 77 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:77:5 | 77 | SaveAggregateState(#[source] sqlx::Error), | ^^^^^^^^^^^^^^^^^^
#[error("failed to append a new domain event: {0}")]
AppendEvent(#[source] sqlx::Error),

Check warning on line 79 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:79:5 | 79 | AppendEvent(#[source] sqlx::Error), | ^^^^^^^^^^^
#[error("failed to commit transaction: {0}")]
CommitTransaction(#[source] sqlx::Error),

Check warning on line 81 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:81:5 | 81 | CommitTransaction(#[source] sqlx::Error), | ^^^^^^^^^^^^^^^^^
#[error("database returned an error: {0}")]
Database(#[from] sqlx::Error),

Check warning on line 83 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/aggregate.rs:83:5 | 83 | Database(#[from] sqlx::Error), | ^^^^^^^^
}

impl From<SaveError> for Option<version::ConflictError> {
fn from(err: SaveError) -> Self {
match err {
SaveError::Conflict(v) => Some(v),
SaveError::Concurrency(v) => Some(v),

Check warning on line 90 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

this match arm has an identical body to another arm

warning: this match arm has an identical body to another arm --> eventually-postgres/src/aggregate.rs:90:13 | 90 | SaveError::Concurrency(v) => Some(v), | -------------------------^^^^^^^^^^^ | | | help: try merging the arm patterns: `SaveError::Concurrency(v) | SaveError::Conflict(v)` | = help: or try changing either arm body note: other arm here --> eventually-postgres/src/aggregate.rs:89:13 | 89 | SaveError::Conflict(v) => Some(v), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#match_same_arms = note: `#[warn(clippy::match_same_arms)]` implied by `#[warn(clippy::pedantic)]`
_ => None,
}
}
Expand All @@ -100,7 +100,7 @@
OutT: From<T> + Send + Sync,
OutEvt: From<T::Event>,
TSerde: Serde<OutT> + Send + Sync,
EvtSerde: Serializer<OutEvt>,
EvtSerde: Serde<OutEvt>,
{
async fn save_aggregate_state(
&self,
Expand All @@ -115,14 +115,14 @@
sqlx::query("CALL upsert_aggregate($1, $2, $3, $4, $5)")
.bind(aggregate_id)
.bind(T::type_name())
.bind(expected_version as i32)

Check warning on line 118 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `u64` to `i32` may truncate the value

warning: casting `u64` to `i32` may truncate the value --> eventually-postgres/src/aggregate.rs:118:19 | 118 | .bind(expected_version as i32) | ^^^^^^^^^^^^^^^^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation = note: `#[warn(clippy::cast_possible_truncation)]` implied by `#[warn(clippy::pedantic)]` help: ... or use `try_from` and handle the error accordingly | 118 | .bind(i32::try_from(expected_version)) | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.bind(root.version() as i32)

Check warning on line 119 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `u64` to `i32` may truncate the value

warning: casting `u64` to `i32` may truncate the value --> eventually-postgres/src/aggregate.rs:119:19 | 119 | .bind(root.version() as i32) | ^^^^^^^^^^^^^^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation help: ... or use `try_from` and handle the error accordingly | 119 | .bind(i32::try_from(root.version())) | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.bind(bytes_state)
.execute(&mut **tx)
.await
.map_err(|err| match crate::check_for_conflict_error(&err) {
Some(err) => SaveError::Conflict(err),
None => match err.as_database_error().and_then(|err| err.code()) {

Check warning on line 125 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

redundant closure

warning: redundant closure --> eventually-postgres/src/aggregate.rs:125:64 | 125 | None => match err.as_database_error().and_then(|err| err.code()) { | ^^^^^^^^^^^^^^^^ help: replace the closure with the method itself: `sqlx::error::DatabaseError::code` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_closure_for_method_calls = note: `#[warn(clippy::redundant_closure_for_method_calls)]` implied by `#[warn(clippy::pedantic)]`
Some(code) if code == "40001" => {
SaveError::Concurrency(version::ConflictError {
expected: expected_version,
Expand All @@ -138,7 +138,7 @@
}

#[async_trait]
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::repository::Getter<T>
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::Repository<T>
for Repository<T, OutT, OutEvt, TSerde, EvtSerde>
where
T: Aggregate + TryFrom<OutT> + Send + Sync,
Expand All @@ -147,15 +147,16 @@
OutT: From<T> + Send + Sync,
OutEvt: From<T::Event> + Send + Sync,
TSerde: Serde<OutT> + Send + Sync,
<TSerde as Deserializer<OutT>>::Error: std::error::Error + Send + Sync + 'static,
EvtSerde: Serializer<OutEvt> + Send + Sync,
<TSerde as Serde<OutT>>::Error: std::error::Error + Send + Sync + 'static,
EvtSerde: Serde<OutEvt> + Send + Sync,
{
type Error = GetError;
type GetError = GetError;
type SaveError = SaveError;

async fn get(
&self,
id: &T::Id,
) -> Result<aggregate::Root<T>, aggregate::repository::GetError<Self::Error>> {
) -> Result<aggregate::Root<T>, aggregate::repository::GetError<Self::GetError>> {
let aggregate_id = id.to_string();

let row = sqlx::query(
Expand Down Expand Up @@ -184,28 +185,12 @@
})?;

Ok(aggregate::Root::rehydrate_from_state(
version as Version,

Check warning on line 188 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `i32` to `u64` may lose the sign of the value

warning: casting `i32` to `u64` may lose the sign of the value --> eventually-postgres/src/aggregate.rs:188:13 | 188 | version as Version, | ^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_sign_loss = note: `#[warn(clippy::cast_sign_loss)]` implied by `#[warn(clippy::pedantic)]`
aggregate,
))
}
}

#[async_trait]
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::repository::Saver<T>
for Repository<T, OutT, OutEvt, TSerde, EvtSerde>
where
T: Aggregate + TryFrom<OutT> + Send + Sync,
<T as Aggregate>::Id: ToString,
<T as TryFrom<OutT>>::Error: std::error::Error + Send + Sync + 'static,
OutT: From<T> + Send + Sync,
OutEvt: From<T::Event> + Send + Sync,
TSerde: Serde<OutT> + Send + Sync,
<TSerde as Deserializer<OutT>>::Error: std::error::Error + Send + Sync + 'static,
EvtSerde: Serializer<OutEvt> + Send + Sync,
{
type Error = SaveError;

async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), Self::Error> {
async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), Self::SaveError> {
let events_to_commit = root.take_uncommitted_events();

if events_to_commit.is_empty() {
Expand All @@ -232,7 +217,7 @@
&mut tx,
&self.event_serde,
&aggregate_id,
root.version() as i32,

Check warning on line 220 in eventually-postgres/src/aggregate.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `u64` to `i32` may truncate the value

warning: casting `u64` to `i32` may truncate the value --> eventually-postgres/src/aggregate.rs:220:13 | 220 | root.version() as i32, | ^^^^^^^^^^^^^^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation help: ... or use `try_from` and handle the error accordingly | 220 | i32::try_from(root.version()), | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
events_to_commit,
)
.await
Expand Down
13 changes: 6 additions & 7 deletions eventually-postgres/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,54 @@
use async_trait::async_trait;
use chrono::Utc;
use eventually::message::{Message, Metadata};
use eventually::serde::{Deserializer, Serde, Serializer};
use eventually::serde::Serde;
use eventually::version::Version;
use eventually::{event, version};
use futures::future::ready;
use futures::{StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use regex::Regex;

Check warning on line 12 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `regex::Regex`

warning: unused import: `regex::Regex` --> eventually-postgres/src/event.rs:12:5 | 12 | use regex::Regex; | ^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
use sqlx::postgres::{PgDatabaseError, PgRow};

Check warning on line 13 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `PgDatabaseError`

warning: unused import: `PgDatabaseError` --> eventually-postgres/src/event.rs:13:22 | 13 | use sqlx::postgres::{PgDatabaseError, PgRow}; | ^^^^^^^^^^^^^^^
use sqlx::{PgPool, Postgres, Row, Transaction};

#[derive(Debug, thiserror::Error)]
pub enum StreamError {

Check warning on line 17 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for an enum

warning: missing documentation for an enum --> eventually-postgres/src/event.rs:17:1 | 17 | pub enum StreamError { | ^^^^^^^^^^^^^^^^^^^^
#[error("failed to convert domain event from its serialization type: {0}")]
ConvertEvent(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),

Check warning on line 19 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:19:5 | 19 | ConvertEvent(#[source] Box<dyn std::error::Error + Send + Sync + 'static>), | ^^^^^^^^^^^^
#[error("failed to deserialize event from database: {0}")]
DeserializeEvent(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),

Check warning on line 21 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:21:5 | 21 | DeserializeEvent(#[source] Box<dyn std::error::Error + Send + Sync + 'static>), | ^^^^^^^^^^^^^^^^
#[error("failed to get column '{name}' from result row: {error}")]
ReadColumn {

Check warning on line 23 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:23:5 | 23 | ReadColumn { | ^^^^^^^^^^
name: &'static str,

Check warning on line 24 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a struct field

warning: missing documentation for a struct field --> eventually-postgres/src/event.rs:24:9 | 24 | name: &'static str, | ^^^^^^^^^^^^^^^^^^
#[source]
error: sqlx::Error,

Check warning on line 26 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a struct field

warning: missing documentation for a struct field --> eventually-postgres/src/event.rs:26:9 | 26 | error: sqlx::Error, | ^^^^^^^^^^^^^^^^^^
},
#[error("db returned an error: {0}")]
Database(#[source] sqlx::Error),

Check warning on line 29 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:29:5 | 29 | Database(#[source] sqlx::Error), | ^^^^^^^^
}

#[derive(Debug, thiserror::Error)]
pub enum AppendError {

Check warning on line 33 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for an enum

warning: missing documentation for an enum --> eventually-postgres/src/event.rs:33:1 | 33 | pub enum AppendError { | ^^^^^^^^^^^^^^^^^^^^
#[error("conflict error detected: {0}")]
Conflict(#[source] version::ConflictError),

Check warning on line 35 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:35:5 | 35 | Conflict(#[source] version::ConflictError), | ^^^^^^^^
#[error("concurrent update detected, represented as a conflict error: {0}")]
Concurrency(#[source] version::ConflictError),

Check warning on line 37 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:37:5 | 37 | Concurrency(#[source] version::ConflictError), | ^^^^^^^^^^^
#[error("failed to begin transaction: {0}")]
BeginTransaction(#[source] sqlx::Error),

Check warning on line 39 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:39:5 | 39 | BeginTransaction(#[source] sqlx::Error), | ^^^^^^^^^^^^^^^^
#[error("failed to upsert new event stream version: {0}")]
UpsertEventStream(#[source] sqlx::Error),

Check warning on line 41 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:41:5 | 41 | UpsertEventStream(#[source] sqlx::Error), | ^^^^^^^^^^^^^^^^^
#[error("failed to append a new domain event: {0}")]
AppendEvent(#[source] sqlx::Error),

Check warning on line 43 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:43:5 | 43 | AppendEvent(#[source] sqlx::Error), | ^^^^^^^^^^^
#[error("failed to commit transaction: {0}")]
CommitTransaction(#[source] sqlx::Error),

Check warning on line 45 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:45:5 | 45 | CommitTransaction(#[source] sqlx::Error), | ^^^^^^^^^^^^^^^^^
#[error("db returned an error: {0}")]
Database(#[from] sqlx::Error),

Check warning on line 47 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a variant

warning: missing documentation for a variant --> eventually-postgres/src/event.rs:47:5 | 47 | Database(#[from] sqlx::Error), | ^^^^^^^^
}

impl From<AppendError> for Option<version::ConflictError> {
fn from(err: AppendError) -> Self {
match err {
AppendError::Conflict(v) => Some(v),
AppendError::Concurrency(v) => Some(v),

Check warning on line 54 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

this match arm has an identical body to another arm

warning: this match arm has an identical body to another arm --> eventually-postgres/src/event.rs:54:13 | 54 | AppendError::Concurrency(v) => Some(v), | ---------------------------^^^^^^^^^^^ | | | help: try merging the arm patterns: `AppendError::Concurrency(v) | AppendError::Conflict(v)` | = help: or try changing either arm body note: other arm here --> eventually-postgres/src/event.rs:53:13 | 53 | AppendError::Conflict(v) => Some(v), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#match_same_arms
_ => None,
}
}
Expand All @@ -60,7 +59,7 @@

pub(crate) async fn append_domain_event<Evt, OutEvt>(
tx: &mut Transaction<'_, Postgres>,
serde: &impl Serializer<OutEvt>,
serde: &impl Serde<OutEvt>,
event_stream_id: &str,
event_version: i32,
new_event_stream_version: i32,
Expand Down Expand Up @@ -97,7 +96,7 @@

pub(crate) async fn append_domain_events<Evt, OutEvt>(
tx: &mut Transaction<'_, Postgres>,
serde: &impl Serializer<OutEvt>,
serde: &impl Serde<OutEvt>,
event_stream_id: &str,
new_version: i32,
events: Vec<event::Envelope<Evt>>,
Expand All @@ -106,10 +105,10 @@
Evt: Message,
OutEvt: From<Evt>,
{
let current_event_stream_version = new_version - (events.len() as i32);

Check warning on line 108 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `usize` to `i32` may wrap around the value on targets with 32-bit wide pointers

warning: casting `usize` to `i32` may wrap around the value on targets with 32-bit wide pointers --> eventually-postgres/src/event.rs:108:54 | 108 | let current_event_stream_version = new_version - (events.len() as i32); | ^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_wrap = note: `#[warn(clippy::cast_possible_wrap)]` implied by `#[warn(clippy::pedantic)]`

Check warning on line 108 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `usize` to `i32` may truncate the value on targets with 64-bit wide pointers

warning: casting `usize` to `i32` may truncate the value on targets with 64-bit wide pointers --> eventually-postgres/src/event.rs:108:54 | 108 | let current_event_stream_version = new_version - (events.len() as i32); | ^^^^^^^^^^^^^^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation help: ... or use `try_from` and handle the error accordingly | 108 | let current_event_stream_version = new_version - i32::try_from(events.len()); | ~~~~~~~~~~~~~~~~~~~~~~~~~~~

for (i, event) in events.into_iter().enumerate() {
let event_version = current_event_stream_version + (i as i32) + 1;

Check warning on line 111 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `usize` to `i32` may wrap around the value on targets with 32-bit wide pointers

warning: casting `usize` to `i32` may wrap around the value on targets with 32-bit wide pointers --> eventually-postgres/src/event.rs:111:60 | 111 | let event_version = current_event_stream_version + (i as i32) + 1; | ^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_wrap

Check warning on line 111 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `usize` to `i32` may truncate the value on targets with 64-bit wide pointers

warning: casting `usize` to `i32` may truncate the value on targets with 64-bit wide pointers --> eventually-postgres/src/event.rs:111:60 | 111 | let event_version = current_event_stream_version + (i as i32) + 1; | ^^^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation help: ... or use `try_from` and handle the error accordingly | 111 | let event_version = current_event_stream_version + i32::try_from(i) + 1; | ~~~~~~~~~~~~~~~~

append_domain_event(
tx,
Expand All @@ -126,7 +125,7 @@
}

#[derive(Debug, Clone)]
pub struct Store<Id, Evt, OutEvt, S>

Check warning on line 128 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a struct

warning: missing documentation for a struct --> eventually-postgres/src/event.rs:128:1 | 128 | pub struct Store<Id, Evt, OutEvt, S> | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
where
Id: ToString + Clone,
Evt: TryFrom<OutEvt>,
Expand All @@ -147,7 +146,7 @@
OutEvt: From<Evt>,
S: Serde<OutEvt>,
{
pub async fn new(pool: PgPool, serde: S) -> Result<Self, sqlx::migrate::MigrateError> {

Check warning on line 149 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for an associated function

warning: missing documentation for an associated function --> eventually-postgres/src/event.rs:149:5 | 149 | pub async fn new(pool: PgPool, serde: S) -> Result<Self, sqlx::migrate::MigrateError> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Check warning on line 149 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

docs for function returning `Result` missing `# Errors` section

warning: docs for function returning `Result` missing `# Errors` section --> eventually-postgres/src/event.rs:149:5 | 149 | pub async fn new(pool: PgPool, serde: S) -> Result<Self, sqlx::migrate::MigrateError> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_errors_doc
// Make sure the latest migrations are used before using the Store instance.
crate::MIGRATIONS.run(&pool).await?;

Expand Down Expand Up @@ -176,12 +175,12 @@
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
OutEvt: From<Evt> + Send + Sync,
S: Serde<OutEvt> + Send + Sync,
<S as Deserializer<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
<S as Serde<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
{
fn event_row_to_persisted_event(
&self,
stream_id: Id,
row: PgRow,

Check warning on line 183 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

this argument is passed by value, but not consumed in the function body

warning: this argument is passed by value, but not consumed in the function body --> eventually-postgres/src/event.rs:183:14 | 183 | row: PgRow, | ^^^^^ help: consider taking a reference instead: `&PgRow` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_pass_by_value = note: `#[warn(clippy::needless_pass_by_value)]` implied by `#[warn(clippy::pedantic)]`
) -> Result<event::Persisted<Id, Evt>, StreamError> {
let version_column: i32 = try_get_column(&row, "version")?;
let event_column: Vec<u8> = try_get_column(&row, "event")?;
Expand All @@ -197,7 +196,7 @@

Ok(event::Persisted {
stream_id,
version: version_column as Version,

Check warning on line 199 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `i32` to `u64` may lose the sign of the value

warning: casting `i32` to `u64` may lose the sign of the value --> eventually-postgres/src/event.rs:199:22 | 199 | version: version_column as Version, | ^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_sign_loss
event: event::Envelope {
message: converted_event,
metadata: metadata_column.0,
Expand All @@ -213,21 +212,21 @@
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
OutEvt: From<Evt> + Send + Sync,
S: Serde<OutEvt> + Send + Sync,
<S as Deserializer<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
<S as Serde<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
{
type Error = StreamError;

fn stream(&self, id: &Id, select: event::VersionSelect) -> event::Stream<Id, Evt, Self::Error> {
let from_version: i32 = match select {
event::VersionSelect::All => 0,
event::VersionSelect::From(v) => v as i32,

Check warning on line 222 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `u64` to `i32` may truncate the value

warning: casting `u64` to `i32` may truncate the value --> eventually-postgres/src/event.rs:222:46 | 222 | event::VersionSelect::From(v) => v as i32, | ^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation help: ... or use `try_from` and handle the error accordingly | 222 | event::VersionSelect::From(v) => i32::try_from(v), | ~~~~~~~~~~~~~~~~
};

let query = sqlx::query(
r#"SELECT version, event, metadata
FROM events
WHERE event_stream_id = $1 AND version >= $2
ORDER BY version"#,

Check warning on line 229 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

unnecessary hashes around raw string literal

warning: unnecessary hashes around raw string literal --> eventually-postgres/src/event.rs:226:13 | 226 | / r#"SELECT version, event, metadata 227 | | FROM events 228 | | WHERE event_stream_id = $1 AND version >= $2 229 | | ORDER BY version"#, | |_________________________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_raw_string_hashes note: the lint level is defined here --> eventually-postgres/src/lib.rs:3:9 | 3 | #![warn(clippy::pedantic)] | ^^^^^^^^^^^^^^^^ = note: `#[warn(clippy::needless_raw_string_hashes)]` implied by `#[warn(clippy::pedantic)]` help: remove all the hashes around the string literal | 226 ~ r"SELECT version, event, metadata 227 | FROM events 228 | WHERE event_stream_id = $1 AND version >= $2 229 ~ ORDER BY version", |
);

let id = id.clone();
Expand All @@ -250,7 +249,7 @@
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
OutEvt: From<Evt> + Send + Sync,
S: Serde<OutEvt> + Send + Sync,
<S as Deserializer<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
<S as Serde<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
{
type Error = AppendError;

Expand All @@ -274,7 +273,7 @@

let new_version: i32 = match version_check {
event::StreamVersionExpected::Any => {
let events_len = events.len() as i32;

Check warning on line 276 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `usize` to `i32` may wrap around the value on targets with 32-bit wide pointers

warning: casting `usize` to `i32` may wrap around the value on targets with 32-bit wide pointers --> eventually-postgres/src/event.rs:276:34 | 276 | let events_len = events.len() as i32; | ^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_wrap

Check warning on line 276 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `usize` to `i32` may truncate the value on targets with 64-bit wide pointers

warning: casting `usize` to `i32` may truncate the value on targets with 64-bit wide pointers --> eventually-postgres/src/event.rs:276:34 | 276 | let events_len = events.len() as i32; | ^^^^^^^^^^^^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation help: ... or use `try_from` and handle the error accordingly | 276 | let events_len = i32::try_from(events.len()); | ~~~~~~~~~~~~~~~~~~~~~~~~~~~

sqlx::query("SELECT * FROM upsert_event_stream_with_no_version_check($1, $2)")
.bind(&string_id)
Expand All @@ -288,13 +287,13 @@

sqlx::query("CALL upsert_event_stream($1, $2, $3)")
.bind(&string_id)
.bind(v as i32)

Check warning on line 290 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `u64` to `i32` may truncate the value

warning: casting `u64` to `i32` may truncate the value --> eventually-postgres/src/event.rs:290:27 | 290 | .bind(v as i32) | ^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation help: ... or use `try_from` and handle the error accordingly | 290 | .bind(i32::try_from(v)) | ~~~~~~~~~~~~~~~~
.bind(new_version as i32)

Check warning on line 291 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `u64` to `i32` may truncate the value

warning: casting `u64` to `i32` may truncate the value --> eventually-postgres/src/event.rs:291:27 | 291 | .bind(new_version as i32) | ^^^^^^^^^^^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation help: ... or use `try_from` and handle the error accordingly | 291 | .bind(i32::try_from(new_version)) | ~~~~~~~~~~~~~~~~~~~~~~~~~~
.execute(&mut *tx)
.await
.map_err(|err| match crate::check_for_conflict_error(&err) {
Some(err) => AppendError::Conflict(err),
None => match err.as_database_error().and_then(|err| err.code()) {

Check warning on line 296 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

redundant closure

warning: redundant closure --> eventually-postgres/src/event.rs:296:72 | 296 | None => match err.as_database_error().and_then(|err| err.code()) { | ^^^^^^^^^^^^^^^^ help: replace the closure with the method itself: `sqlx::error::DatabaseError::code` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_closure_for_method_calls
Some(code) if code == "40001" => {
AppendError::Concurrency(version::ConflictError {
expected: v,
Expand All @@ -304,7 +303,7 @@
_ => AppendError::UpsertEventStream(err),
},
})
.map(|_| new_version as i32)?

Check warning on line 306 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `u64` to `i32` may truncate the value

warning: casting `u64` to `i32` may truncate the value --> eventually-postgres/src/event.rs:306:30 | 306 | .map(|_| new_version as i32)? | ^^^^^^^^^^^^^^^^^^ | = help: if this is intentional allow the lint with `#[allow(clippy::cast_possible_truncation)]` ... = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_possible_truncation help: ... or use `try_from` and handle the error accordingly | 306 | .map(|_| i32::try_from(new_version))? | ~~~~~~~~~~~~~~~~~~~~~~~~~~
},
};

Expand All @@ -314,6 +313,6 @@

tx.commit().await.map_err(AppendError::CommitTransaction)?;

Ok(new_version as Version)

Check warning on line 316 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

casting `i32` to `u64` may lose the sign of the value

warning: casting `i32` to `u64` may lose the sign of the value --> eventually-postgres/src/event.rs:316:12 | 316 | Ok(new_version as Version) | ^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cast_sign_loss
}
}
13 changes: 7 additions & 6 deletions eventually-postgres/tests/aggregate_repository.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use eventually::aggregate::repository::{GetError, Getter, Saver};
use eventually::serde::json::Json;
use eventually::aggregate::repository::GetError;
use eventually::aggregate::Repository;
use eventually::serde::json::JsonSerde;
use eventually::version;
use eventually_postgres::aggregate;
use futures::TryFutureExt;
Expand All @@ -15,8 +16,8 @@ async fn it_works() {

let aggregate_repository = aggregate::Repository::new(
pool,
Json::<setup::TestAggregate>::default(),
Json::<setup::TestDomainEvent>::default(),
JsonSerde::<setup::TestAggregate>::default(),
JsonSerde::<setup::TestDomainEvent>::default(),
)
.await
.unwrap();
Expand Down Expand Up @@ -64,8 +65,8 @@ async fn it_detects_data_races_and_returns_conflict_error() {

let aggregate_repository = aggregate::Repository::new(
pool,
Json::<setup::TestAggregate>::default(),
Json::<setup::TestDomainEvent>::default(),
JsonSerde::<setup::TestAggregate>::default(),
JsonSerde::<setup::TestDomainEvent>::default(),
)
.await
.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions eventually-postgres/tests/event_store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::time::{SystemTime, UNIX_EPOCH};

use eventually::event::{Appender, Persisted, StreamVersionExpected, Streamer, VersionSelect};
use eventually::serde::json::Json;
use eventually::serde::json::JsonSerde;
use eventually::version;
use eventually::version::Version;
use eventually_postgres::event;
use futures::{TryFutureExt, TryStreamExt};
use futures::TryStreamExt;
use rand::Rng;

mod setup;
Expand All @@ -16,7 +16,7 @@ async fn append_with_no_version_check_works() {
.await
.expect("connection to the database should work");

let event_store = event::Store::new(pool, Json::<setup::TestDomainEvent>::default())
let event_store = event::Store::new(pool, JsonSerde::<setup::TestDomainEvent>::default())
.await
.unwrap();

Expand Down Expand Up @@ -72,7 +72,7 @@ async fn it_works_with_version_check_for_conflict() {
.await
.expect("connection to the database should work");

let event_store = event::Store::new(pool, Json::<setup::TestDomainEvent>::default())
let event_store = event::Store::new(pool, JsonSerde::<setup::TestDomainEvent>::default())
.await
.unwrap();

Expand Down Expand Up @@ -148,7 +148,7 @@ async fn it_handles_concurrent_writes_to_the_same_stream() {
.await
.expect("connection to the database should work");

let event_store = event::Store::new(pool, Json::<setup::TestDomainEvent>::default())
let event_store = event::Store::new(pool, JsonSerde::<setup::TestDomainEvent>::default())
.await
.unwrap();

Expand Down
6 changes: 2 additions & 4 deletions eventually/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ default = []
tracing = ["dep:tracing"]
serde-prost = ["dep:prost"]
serde-json = ["dep:serde_json"]
serde-protobuf = ["dep:protobuf", "dep:protobuf-json-mapping"]
full = ["serde-prost", "serde-json", "serde-protobuf", "tracing"]
full = ["serde-prost", "serde-json", "tracing"]

[dependencies]
async-trait = "0.1.74"
Expand All @@ -31,9 +30,8 @@ thiserror = "1.0.50"
prost = { version = "0.12.1", optional = true }
serde_json = { version = "1.0.108", optional = true }
serde = { version = "1.0.192", features = ["derive"] }
protobuf = { version = "3.3.0", optional = true }
protobuf-json-mapping = { version = "3.3.0", optional = true }
tracing = { version = "0.1.40", features = ["async-await"], optional = true }
anyhow = "1.0.75"

[dev-dependencies]
# NOTE: this is only used for test components and assertions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ pub(crate) mod test_user_domain {
mod test {
use std::error::Error;

use crate::aggregate::repository::{Getter, Saver};
use crate::aggregate::test_user_domain::{User, UserEvent};
use crate::aggregate::Repository;
use crate::event::store::EventStoreExt;
use crate::{aggregate, event, version};

Expand Down
Loading
Loading