Skip to content

Commit

Permalink
feat(core): Update libsql backends
Browse files Browse the repository at this point in the history
  • Loading branch information
GeekMasher committed Jan 11, 2025
1 parent 7872a6b commit 7b7c2ef
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 117 deletions.
123 changes: 6 additions & 117 deletions geekorm-core/src/backends/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//!
//! ```no_run
//! # #[cfg(feature = "libsql")] {
//! use std::sync::{Arc, RwLock};
//! use std::sync::Arc;
//! use geekorm::prelude::*;
//!
//! #[derive(Table, Clone, Default, serde::Serialize, serde::Deserialize)]
Expand All @@ -22,7 +22,7 @@
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! let database = libsql::Builder::new_local(":memory:").build().await?;
//! let connection = Arc::new(RwLock::new(database.connect().unwrap()));
//! let connection = Arc::new(tokio::sync::Mutex::new(database.connect().unwrap()));
//!
//! Users::create_table(&connection).await?;
//!
Expand All @@ -40,16 +40,15 @@ use libsql::{de, params::IntoValue};
#[cfg(feature = "log")]
use log::{debug, error};
use serde::{de::DeserializeOwned, Serialize};
#[cfg(not(feature = "backends-tokio"))]
use std::sync::Mutex;
use std::{collections::HashMap, sync::Arc};
#[cfg(feature = "backends-tokio")]
use tokio::sync::Mutex;
use std::collections::HashMap;

use crate::{
builder::models::QueryType, GeekConnection, QueryBuilderTrait, TableBuilder, Value, Values,
};

#[cfg(feature = "backends-tokio")]
mod mutex;

impl GeekConnection for libsql::Connection {
type Connection = libsql::Connection;

Expand Down Expand Up @@ -382,116 +381,6 @@ impl GeekConnection for libsql::Connection {
}
}

const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

impl<C> GeekConnection for Arc<tokio::sync::Mutex<C>>
where
Self: Sync + Send + 'static,
C: GeekConnection<Connection = libsql::Connection>,
{
type Connection = Arc<Mutex<libsql::Connection>>;

async fn create_table<T>(connection: &Self::Connection) -> Result<(), crate::Error>
where
T: TableBuilder + QueryBuilderTrait + Sized + Serialize + DeserializeOwned,
{
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::create_table::<T>(&conn).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
Err(crate::Error::LibSQLError(
"Error getting write lock on connection".to_string(),
))
}

async fn row_count(
connection: &Self::Connection,
query: crate::Query,
) -> Result<i64, crate::Error> {
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::row_count(&conn, query).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
Err(crate::Error::LibSQLError(
"Error getting write lock on connection in row_count".to_string(),
))
}

async fn query<T>(
connection: &Self::Connection,
query: crate::Query,
) -> Result<Vec<T>, crate::Error>
where
T: serde::de::DeserializeOwned,
{
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::query::<T>(&conn, query).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}

Err(crate::Error::LibSQLError(
"Error getting write lock on connection".to_string(),
))
}

async fn query_first<T>(
connection: &Self::Connection,
query: crate::Query,
) -> Result<T, crate::Error>
where
T: serde::de::DeserializeOwned,
{
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::query_first::<T>(&conn, query).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
Err(crate::Error::LibSQLError(
"Error getting write lock on connection".to_string(),
))
}

async fn execute<T>(
connection: &Self::Connection,
query: crate::Query,
) -> Result<(), crate::Error>
where
T: serde::de::DeserializeOwned,
{
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::execute::<T>(&conn, query).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
Err(crate::Error::LibSQLError(
"Error getting write lock on connection in execute".to_string(),
))
}
}

fn convert_values(query: &crate::Query) -> Result<Vec<libsql::Value>, crate::Error> {
let mut parameters: Vec<libsql::Value> = Vec::new();

Expand Down
119 changes: 119 additions & 0 deletions geekorm-core/src/backends/libsql/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;

#[cfg(not(feature = "backends-tokio"))]
use std::sync::Mutex;
#[cfg(feature = "backends-tokio")]
use tokio::sync::Mutex;

use crate::{GeekConnection, QueryBuilderTrait, TableBuilder};

const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

impl<C> GeekConnection for Arc<Mutex<C>>
where
Self: Sync + Send + 'static,
C: GeekConnection<Connection = libsql::Connection>,
{
type Connection = Arc<Mutex<libsql::Connection>>;

async fn create_table<T>(connection: &Self::Connection) -> Result<(), crate::Error>
where
T: TableBuilder + QueryBuilderTrait + Sized + Serialize + DeserializeOwned,
{
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::create_table::<T>(&conn).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
Err(crate::Error::LibSQLError(
"Error getting write lock on connection".to_string(),
))
}

async fn row_count(
connection: &Self::Connection,
query: crate::Query,
) -> Result<i64, crate::Error> {
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::row_count(&conn, query).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
Err(crate::Error::LibSQLError(
"Error getting write lock on connection in row_count".to_string(),
))
}

async fn query<T>(
connection: &Self::Connection,
query: crate::Query,
) -> Result<Vec<T>, crate::Error>
where
T: serde::de::DeserializeOwned,
{
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::query::<T>(&conn, query).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}

Err(crate::Error::LibSQLError(
"Error getting write lock on connection".to_string(),
))
}

async fn query_first<T>(
connection: &Self::Connection,
query: crate::Query,
) -> Result<T, crate::Error>
where
T: serde::de::DeserializeOwned,
{
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::query_first::<T>(&conn, query).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
Err(crate::Error::LibSQLError(
"Error getting write lock on connection".to_string(),
))
}

async fn execute<T>(
connection: &Self::Connection,
query: crate::Query,
) -> Result<(), crate::Error>
where
T: serde::de::DeserializeOwned,
{
let start = std::time::Instant::now();
while start.elapsed() < TIMEOUT {
match connection.try_lock() {
Ok(conn) => return C::execute::<T>(&conn, query).await,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
Err(crate::Error::LibSQLError(
"Error getting write lock on connection in execute".to_string(),
))
}
}

0 comments on commit 7b7c2ef

Please sign in to comment.