diff --git a/lib/include/result_summary.rs b/lib/include/result_summary.rs index 17069f6..6901099 100644 --- a/lib/include/result_summary.rs +++ b/lib/include/result_summary.rs @@ -22,45 +22,18 @@ } // - // next_or_summary + // next + finish let mut stream = graph .execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n")) .await .unwrap(); - let Ok(Some(row)) = stream.next_or_summary().await else { panic!() }; - assert!(row.row().is_some()); - assert!(row.summary().is_none()); + let Ok(Some(row)) = stream.next().await else { panic!() }; + assert_item(row.to().unwrap()); - assert_item(row.row().unwrap().to().unwrap()); - - let Ok(Some(row)) = stream.next_or_summary().await else { panic!() }; - assert!(row.row().is_none()); - assert!(row.summary().is_some()); - - assert_summary(row.summary().unwrap()); - - - // - // as_items - - let mut stream = graph - .execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n")) - .await - .unwrap(); - - let items = stream.as_items::() - .try_collect::>() - .await - .unwrap(); - - for item in items { - match item { - RowItem::Row(row) => assert_item(row), - RowItem::Summary(summary) => assert_summary(&summary), - } - } + let Ok(summary) = stream.finish().await else { panic!() }; + assert_summary(&summary); // @@ -76,7 +49,7 @@ .await .unwrap(); - let Ok(Some(summary)) = stream.finish().await else { panic!() }; + let Ok(summary) = stream.finish().await else { panic!() }; for item in items { assert_item(item); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index ef2f52a..af60dda 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -483,7 +483,7 @@ pub use crate::errors::{ pub use crate::graph::{query, Graph}; pub use crate::query::{Query, QueryParameter}; pub use crate::row::{Node, Path, Point2D, Point3D, Relation, Row, UnboundedRelation}; -pub use crate::stream::{DetachedRowStream, RowItem, RowStream}; +pub use crate::stream::{DetachedRowStream, RowStream}; pub use crate::txn::Txn; pub use crate::types::serde::{ DeError, EndNodeId, Id, Indices, Keys, Labels, Nodes, Offset, Relationships, StartNodeId, diff --git a/lib/src/stream.rs b/lib/src/stream.rs index 4710511..6a4c4d1 100644 --- a/lib/src/stream.rs +++ b/lib/src/stream.rs @@ -27,7 +27,7 @@ type BoxedSummary = Box; type BoxedSummary = (); #[cfg(feature = "unstable-bolt-protocol-impl-v2")] -type FinishResult = Option; +type FinishResult = ResultSummary; #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] type FinishResult = (); @@ -80,55 +80,79 @@ impl DetachedRowStream { } } -#[derive(Clone, Debug)] -pub enum RowItem { - Row(T), - #[cfg(feature = "unstable-result-summary")] - Summary(Box), -} - -impl RowItem { - pub fn row(&self) -> Option<&T> { - match self { - RowItem::Row(row) => Some(row), - #[cfg(feature = "unstable-result-summary")] - _ => None, - } - } - - #[cfg(feature = "unstable-result-summary")] - pub fn summary(&self) -> Option<&ResultSummary> { - match self { - RowItem::Summary(summary) => Some(summary), - _ => None, - } - } - - pub fn into_row(self) -> Option { - match self { - RowItem::Row(row) => Some(row), - #[cfg(feature = "unstable-result-summary")] - _ => None, - } - } - - #[cfg(feature = "unstable-result-summary")] - pub fn into_summary(self) -> Option> { - match self { - RowItem::Summary(summary) => Some(summary), - _ => None, - } - } -} - impl RowStream { /// A call to next() will return a row from an internal buffer if the buffer has any entries, /// if the buffer is empty and the server has more rows left to consume, then a new batch of rows /// are fetched from the server (using the fetch_size value configured see [`crate::ConfigBuilder::fetch_size`]) - pub async fn next(&mut self, handle: impl TransactionHandle) -> Result> { - self.next_or_summary(handle) - .await - .map(|item| item.and_then(RowItem::into_row)) + pub async fn next(&mut self, mut handle: impl TransactionHandle) -> Result> { + loop { + if let Some(row) = self.buffer.pop_front() { + return Ok(Some(row)); + } + + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + { + if self.state == State::Ready { + let pull = Pull::some(self.fetch_size as i64).for_query(self.qid); + let connection = handle.connection(); + connection.send_as(pull).await?; + self.state = loop { + let response = connection + .recv_as::, Streaming>>() + .await?; + match response { + Response::Detail(record) => { + let record = BoltList::from( + record + .into_iter() + .map(BoltType::from) + .collect::>(), + ); + let row = Row::new(self.fields.clone(), record); + self.buffer.push_back(row); + } + Response::Success(Streaming::HasMore) => break State::Ready, + Response::Success(Streaming::Done(mut s)) => { + s.set_t_first(self.available_after); + break State::Complete(s); + } + otherwise => return Err(otherwise.into_error("PULL")), + } + }; + } else if let State::Complete(_) = self.state { + break Ok(None); + } + } + + #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] + { + if self.state == State::Ready { + let pull = BoltRequest::pull(self.fetch_size, self.qid); + let connection = handle.connection(); + connection.send(pull).await?; + + self.state = loop { + match connection.recv().await { + Ok(BoltResponse::Success(s)) => { + break if s.get("has_more").unwrap_or(false) { + State::Ready + } else { + State::Complete(()) + }; + } + Ok(BoltResponse::Record(record)) => { + let row = Row::new(self.fields.clone(), record.data); + self.buffer.push_back(row); + } + Ok(msg) => return Err(msg.into_error("PULL")), + Err(e) => return Err(e), + } + }; + } else if let State::Complete(_) = self.state { + break Ok(None); + }; + } + } } /// Return the [`RowStream::next`] item, @@ -224,83 +248,6 @@ impl RowStream { .and_then(|row| row.to::().map_err(Error::DeserializationError)) } - pub async fn next_or_summary( - &mut self, - mut handle: impl TransactionHandle, - ) -> Result> { - loop { - if let Some(row) = self.buffer.pop_front() { - return Ok(Some(RowItem::Row(row))); - } - - #[cfg(feature = "unstable-bolt-protocol-impl-v2")] - { - if self.state == State::Ready { - let pull = Pull::some(self.fetch_size as i64).for_query(self.qid); - let connection = handle.connection(); - connection.send_as(pull).await?; - self.state = loop { - let response = connection - .recv_as::, Streaming>>() - .await?; - match response { - Response::Detail(record) => { - let record = BoltList::from( - record - .into_iter() - .map(BoltType::from) - .collect::>(), - ); - let row = Row::new(self.fields.clone(), record); - self.buffer.push_back(row); - } - Response::Success(Streaming::HasMore) => break State::Ready, - Response::Success(Streaming::Done(mut s)) => { - s.set_t_first(self.available_after); - break State::Complete(Some(s)); - } - otherwise => return Err(otherwise.into_error("PULL")), - } - }; - } else if let State::Complete(ref mut summary) = self.state { - break match summary.take() { - Some(summary) => Ok(Some(RowItem::Summary(summary))), - None => Ok(None), - }; - } - } - - #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] - { - if self.state == State::Ready { - let pull = BoltRequest::pull(self.fetch_size, self.qid); - let connection = handle.connection(); - connection.send(pull).await?; - - self.state = loop { - match connection.recv().await { - Ok(BoltResponse::Success(s)) => { - break if s.get("has_more").unwrap_or(false) { - State::Ready - } else { - State::Complete(None) - }; - } - Ok(BoltResponse::Record(record)) => { - let row = Row::new(self.fields.clone(), record.data); - self.buffer.push_back(row); - } - Ok(msg) => return Err(msg.into_error("PULL")), - Err(e) => return Err(e), - } - }; - } else if let State::Complete(_) = self.state { - break Ok(None); - }; - } - } - } - /// Stop consuming the stream and return a summary, if available. /// Stopping the stream will also discard any messages on the server side. pub async fn finish(mut self, mut handle: impl TransactionHandle) -> Result { @@ -317,25 +264,21 @@ impl RowStream { }?; let summary = match summary { Summary::Success(s) => match s.metadata { - Streaming::Done(summary) => Some(*summary), + Streaming::Done(summary) => *summary, Streaming::HasMore => { - // this should never happen - None + unreachable!("Query returned has_more after a discard_all"); } }, Summary::Ignored => { - self.state = State::Complete(None); return Err(Error::RequestIgnoredError); } Summary::Failure(f) => { - self.state = State::Complete(None); return Err(f.into_error()); } }; - self.state = State::Complete(None); Ok(summary) } - State::Complete(summary) => Ok(summary.map(|o| *o)), + State::Complete(summary) => Ok(*summary), } #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] @@ -352,7 +295,7 @@ impl RowStream { crate::messages::BoltResponse::Failure(f) => Err(Error::Neo4j(f.into_error())), msg => Err(msg.into_error("DISCARD")), }; - self.state = State::Complete(None); + self.state = State::Complete(()); summary } State::Complete(_) => Ok(()), @@ -372,17 +315,6 @@ impl RowStream { self.convert_rows(handle, Ok) } - /// Turns this RowStream into a [`futures::stream::TryStream`] where - /// every element is a [`RowItem`]. - /// - /// The stream can only be converted once. - pub fn as_row_items<'this, 'db: 'this>( - &'this mut self, - handle: impl TransactionHandle + 'db, - ) -> impl TryStream + 'this { - self.convert_with_summary(handle, Ok) - } - /// Turns this RowStream into a [`futures::stream::TryStream`] where /// every row is converted into a `T` by calling [`crate::row::Row::to`]. /// @@ -396,17 +328,6 @@ impl RowStream { self.convert_rows(handle, |row| row.to::()) } - /// Turns this RowStream into a [`futures::stream::TryStream`] where - /// every row is converted into a [`RowItem`] by calling [`crate::row::Row::to`]. - /// - /// The stream can only be converted once. - pub fn as_items<'this, 'db: 'this, T: DeserializeOwned + 'this>( - &'this mut self, - handle: impl TransactionHandle + 'db, - ) -> impl TryStream, Error = Error> + 'this { - self.convert_with_summary(handle, |row| row.to::()) - } - /// Turns this RowStream into a [`futures::stream::TryStream`] where /// the value at the given column is converted into a `T` /// by calling [`crate::row::Row::get`]. @@ -421,58 +342,18 @@ impl RowStream { self.convert_rows(handle, move |row| row.get::(column)) } - /// Turns this RowStream into a [`futures::stream::TryStream`] where - /// the value at the given column is converted into a [`RowItem`] - /// by calling [`crate::row::Row::get`]. - /// - /// The stream can only be converted once. - pub fn column_to_items<'this, 'db: 'this, T: DeserializeOwned + 'db>( - &'this mut self, - handle: impl TransactionHandle + 'db, - column: &'db str, - ) -> impl TryStream, Error = Error> + 'this { - self.convert_with_summary(handle, move |row| row.get::(column)) - } - fn convert_rows<'this, 'db: 'this, T: 'this>( &'this mut self, handle: impl TransactionHandle + 'db, convert: impl Fn(Row) -> Result + 'this, ) -> impl TryStream + 'this { try_unfold((self, handle, convert), |(stream, mut hd, de)| async move { - match stream.next_or_summary(&mut hd).await { - Ok(Some(RowItem::Row(row))) => match de(row) { + match stream.next(&mut hd).await? { + Some(row) => match de(row) { Ok(res) => Ok(Some((res, (stream, hd, de)))), Err(e) => Err(Error::DeserializationError(e)), }, - #[cfg(feature = "unstable-result-summary")] - Ok(Some(RowItem::Summary(summary))) => { - stream.state = State::Complete(Some(summary)); - Ok(None) - } - Ok(None) => Ok(None), - Err(e) => Err(e), - } - }) - } - - fn convert_with_summary<'this, 'db: 'this, T>( - &'this mut self, - handle: impl TransactionHandle + 'db, - convert: impl Fn(Row) -> Result + 'this, - ) -> impl TryStream, Error = Error> + 'this { - try_unfold((self, handle, convert), |(stream, mut hd, de)| async move { - match stream.next_or_summary(&mut hd).await { - Ok(Some(RowItem::Row(row))) => match de(row) { - Ok(res) => Ok(Some((RowItem::Row(res), (stream, hd, de)))), - Err(e) => Err(Error::DeserializationError(e)), - }, - #[cfg(feature = "unstable-result-summary")] - Ok(Some(RowItem::Summary(summary))) => { - Ok(Some((RowItem::Summary(summary), (stream, hd, de)))) - } - Ok(None) => Ok(None), - Err(e) => Err(e), + None => Ok(None), } }) } @@ -557,13 +438,6 @@ impl DetachedRowStream { self.stream.pop_as() } - /// A call to next_or_summary() will return a row from an internal buffer if the buffer has any entries, - /// if the buffer is empty and the server has more rows left to consume, then a new batch of rows - /// are fetched from the server (using the fetch_size value configured see [`crate::ConfigBuilder::fetch_size`]) - pub async fn next_or_summary(&mut self) -> Result> { - self.stream.next_or_summary(&mut self.connection).await - } - /// Stop consuming the stream and return a summary, if available. /// Stopping the stream will also discard any messages on the server side. pub async fn finish(mut self) -> Result { @@ -580,14 +454,6 @@ impl DetachedRowStream { self.stream.into_stream(&mut self.connection) } - /// Turns this RowStream into a [`futures::stream::TryStream`] where - /// every element is a [`RowItem`]. - /// - /// The stream can only be converted once. - pub fn as_row_items(&mut self) -> impl TryStream + '_ { - self.stream.as_row_items(&mut self.connection) - } - /// Turns this RowStream into a [`futures::stream::TryStream`] where /// every row is converted into a `T` by calling [`crate::row::Row::to`]. /// @@ -600,16 +466,6 @@ impl DetachedRowStream { self.stream.into_stream_as(&mut self.connection) } - /// Turns this RowStream into a [`futures::stream::TryStream`] where - /// every row is converted into a [`RowItem`] by calling [`crate::row::Row::to`]. - /// - /// The stream can only be converted once. - pub fn as_items<'this, T: DeserializeOwned + 'this>( - &'this mut self, - ) -> impl TryStream, Error = Error> + 'this { - self.stream.as_items(&mut self.connection) - } - /// Turns this RowStream into a [`futures::stream::TryStream`] where /// the value at the given column is converted into a `T` /// by calling [`crate::row::Row::get`]. @@ -622,22 +478,10 @@ impl DetachedRowStream { ) -> impl TryStream + 'this { self.stream.column_into_stream(&mut self.connection, column) } - - /// Turns this RowStream into a [`futures::stream::TryStream`] where - /// the value at the given column is converted into a [`RowItem`] - /// by calling [`crate::row::Row::get`]. - /// - /// The stream can only be converted once. - pub fn column_to_items<'this, 'db: 'this, T: DeserializeOwned + 'db>( - &'this mut self, - column: &'db str, - ) -> impl TryStream, Error = Error> + 'this { - self.stream.column_to_items(&mut self.connection, column) - } } #[derive(Clone, PartialEq, Debug)] enum State { Ready, - Complete(Option), + Complete(BoxedSummary), }