Skip to content

Commit 874f55e

Browse files
committed
reducer can now handle errors
1 parent aea5885 commit 874f55e

File tree

10 files changed

+109
-116
lines changed

10 files changed

+109
-116
lines changed

demo/demo-reducer/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
2929
created_at TEXT NOT NULL
3030
)"
3131
)
32-
.await;
32+
.await?;
3333
}
3434

3535
Mutation::CreateTask { id, description } => {
@@ -40,19 +40,19 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
4040
id,
4141
description
4242
)
43-
.await;
43+
.await?;
4444
}
4545

4646
Mutation::DeleteTask { id } => {
47-
execute!("delete from tasks where id = ?", id).await;
47+
execute!("delete from tasks where id = ?", id).await?;
4848
}
4949

5050
Mutation::ToggleCompleted { id } => {
5151
execute!(
5252
"update tasks set completed = not completed where id = ?",
5353
id
5454
)
55-
.await;
55+
.await?;
5656
}
5757
}
5858

lib/sqlsync-react/sqlsync-react-test-reducer/src/lib.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
1616

1717
match mutation {
1818
Mutation::InitSchema => {
19-
futures::join!(
20-
execute!(
21-
"CREATE TABLE IF NOT EXISTS counter (
19+
let create_table = execute!(
20+
"CREATE TABLE IF NOT EXISTS counter (
2221
id INTEGER PRIMARY KEY,
2322
value INTEGER
2423
)"
25-
),
26-
execute!("INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)")
2724
);
25+
let init_counter = execute!(
26+
"INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)"
27+
);
28+
29+
create_table.await?;
30+
init_counter.await?;
2831
}
2932

3033
Mutation::Incr { value } => {
@@ -33,7 +36,7 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
3336
ON CONFLICT (id) DO UPDATE SET value = value + ?",
3437
value
3538
)
36-
.await;
39+
.await?;
3740
}
3841

3942
Mutation::Decr { value } => {
@@ -42,7 +45,7 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
4245
ON CONFLICT (id) DO UPDATE SET value = value - ?",
4346
value
4447
)
45-
.await;
48+
.await?;
4649
}
4750
}
4851

lib/sqlsync-reducer/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ serde = { workspace = true, features = ["derive"] }
1515
bincode.workspace = true
1616
futures.workspace = true
1717
log.workspace = true
18+
thiserror.workspace = true
1819

1920
wasmi = { workspace = true, optional = true }
20-
thiserror = { workspace = true, optional = true }
2121

2222
[features]
2323
default = ["guest"]
24-
host = ["wasmi", "thiserror"]
24+
host = ["wasmi"]
2525
guest = []
2626

2727
[dev-dependencies]

lib/sqlsync-reducer/examples/host.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::collections::BTreeMap;
55
use serde::{Deserialize, Serialize};
66
use sqlsync_reducer::{
77
host_ffi::{register_log_handler, WasmFFI},
8-
types::{ExecResponse, QueryResponse, Request, SqliteError},
8+
types::{ErrorResponse, ExecResponse, QueryResponse, Request},
99
};
1010
use wasmi::{Engine, Linker, Module, Store};
1111

@@ -58,7 +58,7 @@ fn main() -> anyhow::Result<()> {
5858
log::info!("received query request: {} {:?}", sql, params);
5959
let ptr = ffi.encode(
6060
&mut store,
61-
&Ok::<_, SqliteError>(QueryResponse {
61+
&Ok::<_, ErrorResponse>(QueryResponse {
6262
columns: vec!["foo".into(), "bar".into()],
6363
rows: vec![vec!["baz".into(), "qux".into()].into()],
6464
}),
@@ -70,16 +70,20 @@ fn main() -> anyhow::Result<()> {
7070
if sql == "FAIL" {
7171
let ptr = ffi.encode(
7272
&mut store,
73-
&Err::<ExecResponse, _>(SqliteError {
74-
code: Some(1),
75-
message: "error".to_string(),
76-
}),
73+
&Err::<ExecResponse, _>(
74+
ErrorResponse::SqliteError {
75+
code: 1,
76+
message: "error".to_string(),
77+
},
78+
),
7779
)?;
7880
responses.insert(id, ptr);
7981
} else {
8082
let ptr = ffi.encode(
8183
&mut store,
82-
&Ok::<_, SqliteError>(ExecResponse { changes: 1 }),
84+
&Ok::<_, ErrorResponse>(ExecResponse {
85+
changes: 1,
86+
}),
8387
)?;
8488
responses.insert(id, ptr);
8589
}

lib/sqlsync-reducer/src/guest_reactor.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ use serde::de::DeserializeOwned;
1212
use crate::{
1313
guest_ffi::{fbm, FFIBufPtr},
1414
types::{
15-
ExecResponse, QueryResponse, ReducerError, Request, RequestId,
16-
Requests, Responses, SqliteError, SqliteValue,
15+
ErrorResponse, ExecResponse, QueryResponse, ReducerError, Request,
16+
RequestId, Requests, Responses, SqliteValue,
1717
},
1818
};
1919

@@ -107,6 +107,7 @@ impl Reactor {
107107
}
108108
}
109109

110+
#[must_use]
110111
pub struct ResponseFuture<T: DeserializeOwned> {
111112
id: RequestId,
112113
_marker: std::marker::PhantomData<T>,
@@ -132,7 +133,7 @@ impl<T: DeserializeOwned> Future for ResponseFuture<T> {
132133
pub fn raw_query(
133134
sql: String,
134135
params: Vec<SqliteValue>,
135-
) -> ResponseFuture<Result<QueryResponse, SqliteError>> {
136+
) -> ResponseFuture<Result<QueryResponse, ErrorResponse>> {
136137
let request = Request::Query { sql, params };
137138
let id = reactor().queue_request(request);
138139
ResponseFuture::new(id)
@@ -141,7 +142,7 @@ pub fn raw_query(
141142
pub fn raw_execute(
142143
sql: String,
143144
params: Vec<SqliteValue>,
144-
) -> ResponseFuture<Result<ExecResponse, SqliteError>> {
145+
) -> ResponseFuture<Result<ExecResponse, ErrorResponse>> {
145146
let request = Request::Exec { sql, params };
146147
let id = reactor().queue_request(request);
147148
ResponseFuture::new(id)

lib/sqlsync-reducer/src/types.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99

1010
use log::Level;
1111
use serde::{Deserialize, Serialize};
12+
use thiserror::Error;
1213

1314
pub type RequestId = u32;
1415

@@ -62,10 +63,12 @@ pub struct ExecResponse {
6263
pub changes: usize,
6364
}
6465

65-
#[derive(Serialize, Deserialize, Debug)]
66-
pub struct SqliteError {
67-
pub code: Option<i32>,
68-
pub message: String,
66+
#[derive(Serialize, Deserialize, Debug, Error)]
67+
pub enum ErrorResponse {
68+
#[error("SQLite Error({code}): {message}")]
69+
SqliteError { code: i32, message: String },
70+
#[error("Unknown: {0}")]
71+
Unknown(String),
6972
}
7073

7174
#[derive(Serialize, Deserialize)]

lib/sqlsync/examples/counter-reducer.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,32 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
1515
let mutation: Mutation = bincode::deserialize(&mutation)?;
1616
match mutation {
1717
Mutation::InitSchema => {
18-
futures::join!(
19-
execute!(
20-
"CREATE TABLE IF NOT EXISTS counter (
18+
let create_table = execute!(
19+
"CREATE TABLE IF NOT EXISTS counter (
2120
id INTEGER PRIMARY KEY,
2221
value INTEGER
2322
)"
24-
),
25-
execute!(
26-
"INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)"
27-
)
2823
);
24+
let init_counter = execute!(
25+
"INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)"
26+
);
27+
28+
create_table.await?;
29+
init_counter.await?;
2930
}
3031
Mutation::Incr => {
3132
execute!(
3233
"INSERT INTO counter (id, value) VALUES (0, 0)
3334
ON CONFLICT (id) DO UPDATE SET value = value + 1"
3435
)
35-
.await;
36+
.await?;
3637
}
3738
Mutation::Decr => {
3839
execute!(
3940
"INSERT INTO counter (id, value) VALUES (0, 0)
4041
ON CONFLICT (id) DO UPDATE SET value = value - 1"
4142
)
42-
.await;
43+
.await?;
4344
}
4445
}
4546

lib/sqlsync/examples/hello-reducer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
1818
key,
1919
value
2020
)
21-
.await;
21+
.await?;
2222
}
2323
Mutation::Delete(key) => {
24-
execute!("DELETE FROM kv WHERE key = ?", key).await;
24+
execute!("DELETE FROM kv WHERE key = ?", key).await?;
2525
}
2626
}
2727

lib/sqlsync/examples/task-reducer.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ enum Mutation {
2929
}
3030

3131
async fn query_max_sort() -> Result<f64, ReducerError> {
32-
let response = query!("select max(sort) from tasks").await;
32+
let response = query!("select max(sort) from tasks").await?;
3333
assert!(response.rows.len() == 1, "expected 1 row");
3434
Ok(response.rows[0].maybe_get(0)?.unwrap_or(0.0))
3535
}
@@ -45,7 +45,7 @@ async fn query_sort_after(id: i64) -> Result<f64, ReducerError> {
4545
",
4646
id
4747
)
48-
.await;
48+
.await?;
4949

5050
if response.rows.len() == 0 {
5151
query_max_sort().await
@@ -73,7 +73,7 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
7373
created_at TEXT NOT NULL
7474
)"
7575
)
76-
.await;
76+
.await?;
7777
}
7878

7979
Mutation::AppendTask { id, description } => {
@@ -86,18 +86,14 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
8686
max_sort + 1.,
8787
description
8888
)
89-
.await;
89+
.await?;
9090
}
9191

9292
Mutation::RemoveTask { id } => {
93-
execute!("delete from tasks where id = ?", id).await;
93+
execute!("delete from tasks where id = ?", id).await?;
9494
}
9595

96-
Mutation::UpdateTask {
97-
id,
98-
description,
99-
completed,
100-
} => {
96+
Mutation::UpdateTask { id, description, completed } => {
10197
execute!(
10298
"update tasks set
10399
description = IFNULL(?, description),
@@ -107,12 +103,13 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
107103
description,
108104
completed
109105
)
110-
.await;
106+
.await?;
111107
}
112108

113109
Mutation::MoveTask { id, after } => {
114110
let new_sort = query_sort_after(after).await?;
115-
execute!("update tasks set sort = ? where id = ?", new_sort, id);
111+
execute!("update tasks set sort = ? where id = ?", new_sort, id)
112+
.await?;
116113
}
117114
}
118115

0 commit comments

Comments
 (0)