Skip to content

Commit

Permalink
Bolt 5.7: GQL compliant server errors
Browse files Browse the repository at this point in the history
  • Loading branch information
robsdedude committed Jan 29, 2025
1 parent 0eb4d91 commit ab68bff
Show file tree
Hide file tree
Showing 21 changed files with 988 additions and 288 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ jobs:
- 5.9-enterprise-cluster-neo4j
- 5.13-enterprise-cluster-neo4j
- 5.23-enterprise-cluster-neo4j
- 5.26-enterprise-cluster-neo4j
include:
- tests: STUB_TESTS
config: ""
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
- Add support for Bolt 5.6 (GQL compatible notifications/result statuses)
- ⚠️ `neo4j::driver::notification::NotificationFilter`'s API has been completely reworked to support this new feature and enable more internal changes in the future without breaking the API again.
- ⚠️ changed `neo4j::summary::Summary::notifications` from `Option<Vec<Notification>>` to `Vec<Notification>` defaulting to `Vec::new()` when the server does not send any notifications.
- Add support for Bolt 5.7 (GQL compatible errors)
- ⚠️ `neo4j::error::ServerError` is now `#[non_exhaustive]`
- ⚠️ `neo4j::error::ServerError::new(...)` has been removed.
User-code should not need to create arbitrary `ServerError`s.
In return, `ServerError` now implements `Clone`.

**🔧 Fixes**
- Rework `neo4j::value::graph::Path`
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ A bump in MSRV is considered a minor breaking change.
* [x] 5.5 (never released)
* [x] 5.6 (GQL notifications)
* [ ] 5.7
* [ ] (GQL errors)
* [x] (GQL errors)
* [ ] (new bolt version handshake)
* [ ] 5.8 (home db resolution cache)
* [x] Types
Expand Down
4 changes: 4 additions & 0 deletions neo4j/src/driver/io/bolt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod bolt5x2;
mod bolt5x3;
mod bolt5x4;
mod bolt5x6;
mod bolt5x7;
mod bolt_state;
mod chunk;
mod handshake;
Expand Down Expand Up @@ -61,6 +62,7 @@ use bolt5x2::{Bolt5x2, Bolt5x2StructTranslator};
use bolt5x3::{Bolt5x3, Bolt5x3StructTranslator};
use bolt5x4::{Bolt5x4, Bolt5x4StructTranslator};
use bolt5x6::{Bolt5x6, Bolt5x6StructTranslator};
use bolt5x7::{Bolt5x7, Bolt5x7StructTranslator};
use bolt_state::{BoltState, BoltStateTracker};
use chunk::{Chunker, Dechunker};
pub(crate) use handshake::{open, TcpConnector};
Expand Down Expand Up @@ -177,6 +179,7 @@ impl<RW: Read + Write> Bolt<RW> {
data: BoltData::new(version, stream, socket, local_port, address),
// [bolt-version-bump] search tag when changing bolt version support
protocol: match version {
(5, 7) => Bolt5x7::<Bolt5x7StructTranslator>::default().into(),
(5, 6) => Bolt5x6::<Bolt5x6StructTranslator>::default().into(),
(5, 4) => Bolt5x4::<Bolt5x4StructTranslator>::default().into(),
(5, 3) => Bolt5x3::<Bolt5x3StructTranslator>::default().into(),
Expand Down Expand Up @@ -492,6 +495,7 @@ enum BoltProtocolVersion {
V5x3(Bolt5x3<Bolt5x3StructTranslator>),
V5x4(Bolt5x4<Bolt5x4StructTranslator>),
V5x6(Bolt5x6<Bolt5x6StructTranslator>),
V5x7(Bolt5x7<Bolt5x7StructTranslator>),
}

#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
Expand Down
16 changes: 8 additions & 8 deletions neo4j/src/driver/io/bolt/bolt5x0/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ impl<T: BoltStructTranslator> Bolt5x0<T> {
protocol_version,
}
}

pub(in super::super) fn try_parse_error(meta: ValueReceive) -> Result<ServerError> {
let meta = meta
.try_into_map()
.map_err(|_| Neo4jError::protocol_error("FAILURE meta was not a Dictionary"))?;
Ok(ServerError::from_meta(meta))
}
}

impl<T: BoltStructTranslator> Default for Bolt5x0<T> {
Expand Down Expand Up @@ -992,7 +999,7 @@ impl<T: BoltStructTranslator> BoltProtocol for Bolt5x0<T> {
assert_response_field_count("FAILURE", &fields, 1)?;
let meta = fields.pop().unwrap();
bolt_debug!(bolt_data, "S: FAILURE {}", meta.dbg_print());
let mut error = try_parse_error(meta)?;
let mut error = Self::try_parse_error(meta)?;
bolt_data.bolt_state.failure();
match on_server_error {
None => response.callbacks.on_failure(error),
Expand Down Expand Up @@ -1032,10 +1039,3 @@ impl<T: BoltStructTranslator> BoltProtocol for Bolt5x0<T> {
}
}
}

fn try_parse_error(meta: ValueReceive) -> Result<ServerError> {
let meta = meta
.try_into_map()
.map_err(|_| Neo4jError::protocol_error("FAILURE meta was not a Dictionary"))?;
Ok(ServerError::from_meta(meta))
}
2 changes: 2 additions & 0 deletions neo4j/src/driver/io/bolt/bolt5x6/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl<T: BoltStructTranslator> Default for Bolt5x6<T> {
}

impl<T: BoltStructTranslator> BoltProtocol for Bolt5x6<T> {
#[inline]
fn hello<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
Expand Down Expand Up @@ -202,6 +203,7 @@ impl<T: BoltStructTranslator> BoltProtocol for Bolt5x6<T> {
self.bolt5x4.route(data, parameters, callbacks)
}

#[inline]
fn telemetry<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
Expand Down
19 changes: 19 additions & 0 deletions neo4j/src/driver/io/bolt/bolt5x7.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright Rouven Bauer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod protocol;
mod translator;

pub(crate) use protocol::Bolt5x7;
pub(crate) use translator::Bolt5x7StructTranslator;
264 changes: 264 additions & 0 deletions neo4j/src/driver/io/bolt/bolt5x7/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
// Copyright Rouven Bauer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::Debug;
use std::io::{Read, Write};

use crate::error::ServerError;
use log::{debug, warn};

use super::super::bolt5x6::Bolt5x6;
use super::super::bolt_common::ServerAwareBoltVersion;
use super::super::message::BoltMessage;
use super::super::message_parameters::{
BeginParameters, CommitParameters, DiscardParameters, GoodbyeParameters, HelloParameters,
PullParameters, ReauthParameters, ResetParameters, RollbackParameters, RouteParameters,
RunParameters, TelemetryParameters,
};
use super::super::response::BoltMeta;
use super::super::{
assert_response_field_count, bolt_debug, bolt_debug_extra, dbg_extra, BoltData, BoltProtocol,
BoltStructTranslator, OnServerErrorCb, ResponseCallbacks,
};
use crate::error_::Result;
use crate::value::ValueReceive;
use crate::Neo4jError;

#[derive(Debug)]
pub(crate) struct Bolt5x7<T: BoltStructTranslator> {
pub(in super::super) bolt5x6: Bolt5x6<T>,
}

impl<T: BoltStructTranslator> Bolt5x7<T> {
pub(in super::super) fn new(protocol_version: ServerAwareBoltVersion) -> Self {
Self {
bolt5x6: Bolt5x6::new(protocol_version),
}
}

pub(in super::super) fn try_parse_error(meta: ValueReceive) -> Result<ServerError> {
let meta = meta
.try_into_map()
.map_err(|_| Neo4jError::protocol_error("FAILURE meta was not a Dictionary"))?;
Ok(ServerError::from_meta_gql(meta))
}

pub(in super::super) fn enrich_failure_diag_record(mut meta: &mut BoltMeta) {
loop {
if let Some(diag_record) = meta
.entry(String::from("diagnostic_record"))
.or_insert_with(|| ValueReceive::Map(HashMap::with_capacity(3)))
.as_map_mut()
{
for (key, value) in &[
("OPERATION", ""),
("OPERATION_CODE", "0"),
("CURRENT_SCHEMA", "/"),
] {
diag_record
.entry(String::from(*key))
.or_insert_with(|| ValueReceive::String(String::from(*value)));
}
}
match meta.get_mut("cause").and_then(ValueReceive::as_map_mut) {
None => break,
Some(cause) => meta = cause,
};
}
}
}

impl<T: BoltStructTranslator> Default for Bolt5x7<T> {
fn default() -> Self {
Self::new(ServerAwareBoltVersion::V5x7)
}
}

impl<T: BoltStructTranslator> BoltProtocol for Bolt5x7<T> {
#[inline]
fn hello<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: HelloParameters,
) -> Result<()> {
self.bolt5x6.hello(data, parameters)
}

#[inline]
fn reauth<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: ReauthParameters,
) -> Result<()> {
self.bolt5x6.reauth(data, parameters)
}

#[inline]
fn supports_reauth(&self) -> bool {
self.bolt5x6.supports_reauth()
}

#[inline]
fn goodbye<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: GoodbyeParameters,
) -> Result<()> {
self.bolt5x6.goodbye(data, parameters)
}

#[inline]
fn reset<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: ResetParameters,
) -> Result<()> {
self.bolt5x6.reset(data, parameters)
}

#[inline]
fn run<RW: Read + Write, KP: Borrow<str> + Debug, KM: Borrow<str> + Debug>(
&mut self,
data: &mut BoltData<RW>,
parameters: RunParameters<KP, KM>,
callbacks: ResponseCallbacks,
) -> Result<()> {
self.bolt5x6.run(data, parameters, callbacks)
}

#[inline]
fn discard<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: DiscardParameters,
callbacks: ResponseCallbacks,
) -> Result<()> {
self.bolt5x6.discard(data, parameters, callbacks)
}

#[inline]
fn pull<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: PullParameters,
callbacks: ResponseCallbacks,
) -> Result<()> {
self.bolt5x6.pull(data, parameters, callbacks)
}

#[inline]
fn begin<RW: Read + Write, K: Borrow<str> + Debug>(
&mut self,
data: &mut BoltData<RW>,
parameters: BeginParameters<K>,
callbacks: ResponseCallbacks,
) -> Result<()> {
self.bolt5x6.begin(data, parameters, callbacks)
}

#[inline]
fn commit<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: CommitParameters,
callbacks: ResponseCallbacks,
) -> Result<()> {
self.bolt5x6.commit(data, parameters, callbacks)
}

#[inline]
fn rollback<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: RollbackParameters,
) -> Result<()> {
self.bolt5x6.rollback(data, parameters)
}

#[inline]
fn route<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: RouteParameters,
callbacks: ResponseCallbacks,
) -> Result<()> {
self.bolt5x6.route(data, parameters, callbacks)
}

#[inline]
fn telemetry<RW: Read + Write>(
&mut self,
data: &mut BoltData<RW>,
parameters: TelemetryParameters,
callbacks: ResponseCallbacks,
) -> Result<()> {
self.bolt5x6.telemetry(data, parameters, callbacks)
}

#[inline]
fn load_value<R: Read>(&mut self, reader: &mut R) -> Result<ValueReceive> {
self.bolt5x6.load_value(reader)
}

#[inline]
fn handle_response<RW: Read + Write>(
&mut self,
bolt_data: &mut BoltData<RW>,
message: BoltMessage<ValueReceive>,
on_server_error: OnServerErrorCb<RW>,
) -> Result<()> {
match message {
BoltMessage {
tag: 0x7F,
mut fields,
} => {
// FAILURE
let mut response = bolt_data
.responses
.pop_front()
.expect("called Bolt::read_one with empty response queue");

assert_response_field_count("FAILURE", &fields, 1)?;
let mut meta = fields.pop().unwrap();
bolt_debug!(bolt_data, "S: FAILURE {}", meta.dbg_print());
meta.as_map_mut().map(Self::enrich_failure_diag_record);
let mut error = Self::try_parse_error(meta)?;
bolt_data.bolt_state.failure();
match on_server_error {
None => response.callbacks.on_failure(error),
Some(cb) => {
let res1 = cb(bolt_data, &mut error);
let res2 = response.callbacks.on_failure(error);
match res1 {
Ok(()) => res2,
Err(e1) => {
if let Err(e2) = res2 {
warn!(
"server error swallowed because of user callback error: {e2}"
);
}
Err(e1)
}
}
}
}
}
message => self
.bolt5x6
.handle_response(bolt_data, message, on_server_error),
}
}
}
Loading

0 comments on commit ab68bff

Please sign in to comment.