Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change query entrypoints to use Into<Query> instead of just Query #213

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions lib/examples/concurrent_writes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::stream::{self, StreamExt, TryStreamExt};
use neo4rs::{query, ConfigBuilder, Graph};
use neo4rs::{ConfigBuilder, Graph};

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -35,7 +35,7 @@ async fn main() {

async fn work(i: u64, graph: Graph) -> (u64, u64, u64) {
graph
.run(query(
.run(
"
CREATE
(dan:Person {name: 'Dan'}),
Expand Down Expand Up @@ -77,12 +77,12 @@ CREATE
(elsa)-[:BUYS {amount: 3}]->(chocolate),
(elsa)-[:BUYS {amount: 3}]->(milk)
",
))
)
.await
.unwrap();

let node_count = graph
.execute(query("MATCH (n) RETURN count(n) AS count"))
.execute("MATCH (n) RETURN count(n) AS count")
.await
.unwrap()
.column_into_stream::<u64>("count")
Expand All @@ -91,7 +91,7 @@ CREATE
.unwrap();

let rel_count = graph
.execute(query("MATCH ()-[r]->() RETURN count(r) AS count"))
.execute("MATCH ()-[r]->() RETURN count(r) AS count")
.await
.unwrap()
.column_into_stream::<u64>("count")
Expand Down
34 changes: 20 additions & 14 deletions lib/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ impl Graph {
/// Use [`Graph::run`] for cases where you just want a write operation
///
/// use [`Graph::execute`] when you are interested in the result stream
pub async fn run(&self, q: Query) -> Result<()> {
self.impl_run_on(self.config.db.clone(), q, Operation::Write)
pub async fn run(&self, q: impl Into<Query>) -> Result<()> {
self.impl_run_on(self.config.db.clone(), q.into(), Operation::Write)
.await
}

Expand All @@ -178,15 +178,16 @@ impl Graph {
pub async fn run_on(
&self,
db: impl Into<Database>,
q: Query,
q: impl Into<Query>,
operation: Operation,
) -> Result<()> {
self.impl_run_on(Some(db.into()), q, operation).await
self.impl_run_on(Some(db.into()), q.into(), operation).await
}

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
pub async fn run_on(&self, db: impl Into<Database>, q: Query) -> Result<()> {
self.impl_run_on(Some(db.into()), q, Operation::Write).await
pub async fn run_on(&self, db: impl Into<Database>, q: impl Into<Query>) -> Result<()> {
self.impl_run_on(Some(db.into()), q.into(), Operation::Write)
.await
}

#[allow(unused_variables)]
Expand Down Expand Up @@ -229,8 +230,8 @@ impl Graph {
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
pub async fn execute(&self, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q, Operation::Write)
pub async fn execute(&self, q: impl Into<Query>) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q.into(), Operation::Write)
.await
}

Expand All @@ -240,8 +241,8 @@ impl Graph {
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
pub async fn execute_read(&self, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q, Operation::Read)
pub async fn execute_read(&self, q: impl Into<Query>) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q.into(), Operation::Read)
.await
}

Expand All @@ -255,10 +256,11 @@ impl Graph {
pub async fn execute_on(
&self,
db: impl Into<Database>,
q: Query,
q: impl Into<Query>,
operation: Operation,
) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q, operation).await
self.impl_execute_on(Some(db.into()), q.into(), operation)
.await
}

/// Executes a query on the provided database and returns a [`DetachedRowStream`]
Expand All @@ -267,8 +269,12 @@ impl Graph {
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
pub async fn execute_on(&self, db: impl Into<Database>, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q, Operation::Write)
pub async fn execute_on(
&self,
db: impl Into<Database>,
q: impl Into<Query>,
) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q.into(), Operation::Write)
.await
}

Expand Down
8 changes: 4 additions & 4 deletions lib/src/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ impl Txn {
}

/// Runs a single query and discards the stream.
pub async fn run(&mut self, q: Query) -> Result<()> {
let mut query = q.clone();
pub async fn run(&mut self, q: impl Into<Query>) -> Result<()> {
let mut query = q.into();
if let Some(db) = self.db.as_ref() {
query = query.extra("db", db.to_string());
}
Expand All @@ -68,8 +68,8 @@ impl Txn {
}

/// Executes a query and returns a [`RowStream`]
pub async fn execute(&mut self, q: Query) -> Result<RowStream> {
let mut query = q.clone();
pub async fn execute(&mut self, q: impl Into<Query>) -> Result<RowStream> {
let mut query = q.into();
if let Some(db) = self.db.as_ref() {
query = query.extra("db", db.to_string());
}
Expand Down
21 changes: 6 additions & 15 deletions lib/tests/node_property_parsing.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, FixedOffset};
use neo4rs::{query, Node, Point2D, Point3D};
use neo4rs::{Node, Point2D, Point3D};

mod container;

Expand All @@ -9,31 +9,25 @@ async fn node_property_parsing() {
let graph = neo4j.graph();

graph
.run(query(
.run(
"CREATE
(:Datetime {p1:DATETIME('2024-12-31T08:10:35')}),
(:Point2D {a:Point ({x:2,y:3})}),
(:Point3D {a:Point ({x:3,y:4,z:5})})
",
))
)
.await
.unwrap();

let mut result = graph
.execute(query("MATCH (p:DateTime) RETURN p"))
.await
.unwrap();
let mut result = graph.execute("MATCH (p:DateTime) RETURN p").await.unwrap();

while let Ok(Some(row)) = result.next().await {
let node: Node = row.get("p").unwrap();
let p1 = node.get::<DateTime<FixedOffset>>("p1").unwrap();
assert_eq!(p1.timestamp(), 1735632635);
}

let mut result = graph
.execute(query("MATCH (p:Point2D) RETURN p"))
.await
.unwrap();
let mut result = graph.execute("MATCH (p:Point2D) RETURN p").await.unwrap();

while let Ok(Some(row)) = result.next().await {
let node: Node = row.get("p").unwrap();
Expand All @@ -42,10 +36,7 @@ async fn node_property_parsing() {
assert_eq!(p1.y(), 3.0);
}

let mut result = graph
.execute(query("MATCH (p:Point3D) RETURN p"))
.await
.unwrap();
let mut result = graph.execute("MATCH (p:Point3D) RETURN p").await.unwrap();

while let Ok(Some(row)) = result.next().await {
let node: Node = row.get("p").unwrap();
Expand Down
8 changes: 3 additions & 5 deletions lib/tests/use_default_db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::TryStreamExt;
use neo4rs::*;
use neo4rs::{query, Operation};

mod container;

Expand Down Expand Up @@ -27,13 +27,11 @@ async fn use_default_db() {

#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
let query_stream = graph
.execute_on("system", query("SHOW DEFAULT DATABASE"), Operation::Read)
.execute_on("system", "SHOW DEFAULT DATABASE", Operation::Read)
.await;

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
let query_stream = graph
.execute_on("system", query("SHOW DEFAULT DATABASE"))
.await;
let query_stream = graph.execute_on("system", "SHOW DEFAULT DATABASE").await;

let default_db = query_stream
.unwrap()
Expand Down
Loading