Skip to content

Commit

Permalink
Make Connection::call return a Result (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
czocher authored Apr 2, 2023
1 parent 066ea3a commit d97e88d
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 80 deletions.
8 changes: 4 additions & 4 deletions examples/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)]

use rusqlite::{params, Result};
use tokio_rusqlite::Connection;
use rusqlite::params;
use tokio_rusqlite::{Connection, Result};

#[derive(Debug)]
struct Person {
Expand Down Expand Up @@ -45,7 +45,7 @@ async fn main() -> Result<()> {
data: row.get(2)?,
})
})?
.collect::<Result<Vec<Person>, rusqlite::Error>>()?;
.collect::<std::result::Result<Vec<Person>, rusqlite::Error>>()?;

Ok::<_, rusqlite::Error>(people)
})
Expand All @@ -55,6 +55,6 @@ async fn main() -> Result<()> {
println!("Found person {:?}", person);
}

conn.close().await.map_err(|e| e.1)?;
conn.close().await?;
Ok(())
}
8 changes: 4 additions & 4 deletions examples/multiple_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(dead_code)]

use rusqlite::{params, Result};
use rusqlite::params;
use tokio::task::JoinHandle;
use tokio_rusqlite::Connection;
use tokio_rusqlite::{Connection, Result};

#[derive(Debug)]
struct Person {
Expand Down Expand Up @@ -48,7 +48,7 @@ async fn main() -> Result<()> {
data: row.get(2)?,
})
})?
.collect::<Result<Vec<Person>, rusqlite::Error>>()?;
.collect::<std::result::Result<Vec<Person>, rusqlite::Error>>()?;

Ok::<_, rusqlite::Error>(people)
})
Expand All @@ -58,7 +58,7 @@ async fn main() -> Result<()> {
println!("Found person {:?}", person);
}

conn.close().await.map_err(|e| e.1)?;
conn.close().await?;
Ok(())
}

Expand Down
134 changes: 100 additions & 34 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
//! # Example
//!
//! ```rust,no_run
//! use rusqlite::{params, Result};
//! use tokio_rusqlite::Connection;
//! use rusqlite::params;
//! use tokio_rusqlite::{Connection, Result};
//!
//! #[derive(Debug)]
//! struct Person {
Expand Down Expand Up @@ -62,7 +62,7 @@
//! data: row.get(2)?,
//! })
//! })?
//! .collect::<Result<Vec<Person>, rusqlite::Error>>()?;
//! .collect::<std::result::Result<Vec<Person>, rusqlite::Error>>()?;
//!
//! Ok::<_, rusqlite::Error>(people)
//! })
Expand Down Expand Up @@ -102,19 +102,64 @@ mod tests;
use crossbeam_channel::Sender;
use rusqlite::OpenFlags;
use std::{
fmt::{self, Debug},
fmt::{self, Debug, Display},
path::Path,
thread,
};
use tokio::sync::oneshot;
use tokio::sync::oneshot::{self};

const BUG_TEXT: &str = "bug in tokio-rusqlite, please report";

#[derive(Debug)]
/// Represents the errors specific for this library.
#[non_exhaustive]
pub enum Error {
/// The connection to the SQLite has been closed and cannot be queried any more.
ConnectionClosed,

/// An error occured while closing the SQLite connection.
/// This `Error` variant contains the [`Connection`], which can be used to retry the close operation
/// and the underlying [`rusqlite::Error`] that made it impossile to close the database.
Close((Connection, rusqlite::Error)),

/// A `Rusqlite` error occured.
Rusqlite(rusqlite::Error),
}

impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::ConnectionClosed => write!(f, "ConnectionClosed"),
Error::Close(e) => write!(f, "Close((Connection, \"{}\"))", e.1),
Error::Rusqlite(e) => write!(f, "Rusqlite(\"{}\")", e),
}
}
}

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::ConnectionClosed => None,
Error::Close((_, e)) => Some(e),
Error::Rusqlite(e) => Some(e),
}
}
}

impl From<rusqlite::Error> for Error {
fn from(value: rusqlite::Error) -> Self {
Error::Rusqlite(value)
}
}

/// The result returned on method calls in this crate.
pub type Result<T> = std::result::Result<T, Error>;

type CallFn = Box<dyn FnOnce(&mut rusqlite::Connection) + Send + 'static>;

enum Message {
Execute(CallFn),
Close(oneshot::Sender<Result<(), rusqlite::Error>>),
Close(oneshot::Sender<std::result::Result<(), rusqlite::Error>>),
}

/// A handle to call functions in background thread.
Expand All @@ -134,18 +179,22 @@ impl Connection {
///
/// Will return `Err` if `path` cannot be converted to a C-compatible
/// string or if the underlying SQLite open call fails.
pub async fn open<P: AsRef<Path>>(path: P) -> rusqlite::Result<Self> {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_owned();
start(move || rusqlite::Connection::open(path)).await
start(move || rusqlite::Connection::open(path))
.await
.map_err(Error::Rusqlite)
}

/// Open a new connection to an in-memory SQLite database.
///
/// # Failure
///
/// Will return `Err` if the underlying SQLite open call fails.
pub async fn open_in_memory() -> rusqlite::Result<Self> {
start(rusqlite::Connection::open_in_memory).await
pub async fn open_in_memory() -> Result<Self> {
start(rusqlite::Connection::open_in_memory)
.await
.map_err(Error::Rusqlite)
}

/// Open a new connection to a SQLite database.
Expand All @@ -157,12 +206,11 @@ impl Connection {
///
/// Will return `Err` if `path` cannot be converted to a C-compatible
/// string or if the underlying SQLite open call fails.
pub async fn open_with_flags<P: AsRef<Path>>(
path: P,
flags: OpenFlags,
) -> rusqlite::Result<Self> {
pub async fn open_with_flags<P: AsRef<Path>>(path: P, flags: OpenFlags) -> Result<Self> {
let path = path.as_ref().to_owned();
start(move || rusqlite::Connection::open_with_flags(path, flags)).await
start(move || rusqlite::Connection::open_with_flags(path, flags))
.await
.map_err(Error::Rusqlite)
}

/// Open a new connection to a SQLite database using the specific flags
Expand All @@ -179,10 +227,12 @@ impl Connection {
path: P,
flags: OpenFlags,
vfs: &str,
) -> rusqlite::Result<Self> {
) -> Result<Self> {
let path = path.as_ref().to_owned();
let vfs = vfs.to_owned();
start(move || rusqlite::Connection::open_with_flags_and_vfs(path, flags, &vfs)).await
start(move || rusqlite::Connection::open_with_flags_and_vfs(path, flags, &vfs))
.await
.map_err(Error::Rusqlite)
}

/// Open a new connection to an in-memory SQLite database.
Expand All @@ -193,8 +243,10 @@ impl Connection {
/// # Failure
///
/// Will return `Err` if the underlying SQLite open call fails.
pub async fn open_in_memory_with_flags(flags: OpenFlags) -> rusqlite::Result<Self> {
start(move || rusqlite::Connection::open_in_memory_with_flags(flags)).await
pub async fn open_in_memory_with_flags(flags: OpenFlags) -> Result<Self> {
start(move || rusqlite::Connection::open_in_memory_with_flags(flags))
.await
.map_err(Error::Rusqlite)
}

/// Open a new connection to an in-memory SQLite database using the
Expand All @@ -207,31 +259,37 @@ impl Connection {
///
/// Will return `Err` if `vfs` cannot be converted to a C-compatible
/// string or if the underlying SQLite open call fails.
pub async fn open_in_memory_with_flags_and_vfs(
flags: OpenFlags,
vfs: &str,
) -> rusqlite::Result<Self> {
pub async fn open_in_memory_with_flags_and_vfs(flags: OpenFlags, vfs: &str) -> Result<Self> {
let vfs = vfs.to_owned();
start(move || rusqlite::Connection::open_in_memory_with_flags_and_vfs(flags, &vfs)).await
start(move || rusqlite::Connection::open_in_memory_with_flags_and_vfs(flags, &vfs))
.await
.map_err(Error::Rusqlite)
}

/// Call a function in background thread and get the result
/// asynchronously.
pub async fn call<F, R>(&self, function: F) -> R
///
/// # Failure
///
/// Will return `Err` if the database connection has been closed.
pub async fn call<F, R>(&self, function: F) -> Result<R>
where
F: FnOnce(&mut rusqlite::Connection) -> R + Send + 'static,
F: FnOnce(&mut rusqlite::Connection) -> rusqlite::Result<R> + 'static + Send,
R: Send + 'static,
{
let (sender, receiver) = oneshot::channel::<R>();
let (sender, receiver) = oneshot::channel::<rusqlite::Result<R>>();

self.sender
.send(Message::Execute(Box::new(move |conn| {
let value = function(conn);
let _ = sender.send(value);
})))
.expect("database connection should be open");
.map_err(|_| Error::ConnectionClosed)?;

receiver.await.expect(BUG_TEXT)
receiver
.await
.map_err(|_| Error::ConnectionClosed)?
.map_err(Error::Rusqlite)
}

/// Close the database connection.
Expand All @@ -242,22 +300,30 @@ impl Connection {
///
/// If successful, any following `close` operations performed
/// on `Connection` copies will succeed immediately.
///
/// On the other hand, any calls to [`Connection::call`] will `panic`.
///
/// On the other hand, any calls to [`Connection::call`] will return a [`Error::ConnectionClosed`].
///
/// # Failure
///
/// Will return `Err` if the underlying SQLite close call fails.
pub async fn close(self) -> Result<(), (Self, rusqlite::Error)> {
let (sender, receiver) = oneshot::channel::<Result<(), rusqlite::Error>>();
pub async fn close(self) -> Result<()> {
let (sender, receiver) = oneshot::channel::<std::result::Result<(), rusqlite::Error>>();

if let Err(crossbeam_channel::SendError(_)) = self.sender.send(Message::Close(sender)) {
// If the channel is closed on the other side, it means the connection closed successfully
// This is a safeguard against calling close on a `Copy` of the connection
return Ok(());
}

receiver.await.expect(BUG_TEXT).map_err(|e| (self, e))
let result = receiver.await;

if result.is_err() {
// If we get a RecvError at this point, it also means the channel closed in the meantime
// we can assume the connection is closed
return Ok(());
}

result.unwrap().map_err(|e| Error::Close((self, e)))
}
}

Expand Down
Loading

0 comments on commit d97e88d

Please sign in to comment.