Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scylla-rust-wrapper/src/cass_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,8 @@ impl TryFrom<CassConsistency> for Consistency {
CassConsistency::CASS_CONSISTENCY_LOCAL_QUORUM => Ok(Consistency::LocalQuorum),
CassConsistency::CASS_CONSISTENCY_EACH_QUORUM => Ok(Consistency::EachQuorum),
CassConsistency::CASS_CONSISTENCY_LOCAL_ONE => Ok(Consistency::LocalOne),
CassConsistency::CASS_CONSISTENCY_LOCAL_SERIAL => Ok(Consistency::LocalSerial),
CassConsistency::CASS_CONSISTENCY_SERIAL => Ok(Consistency::Serial),
_ => Err(()),
}
}
Expand Down
65 changes: 65 additions & 0 deletions scylla-rust-wrapper/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use scylla::{SessionBuilder, SessionConfig};
use std::collections::HashMap;
use std::convert::TryInto;
use std::future::Future;
use std::num::NonZeroU32;
use std::os::raw::{c_char, c_int, c_uint};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -43,6 +44,12 @@ const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
const DEFAULT_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
// - keepalive timeout is 60 secs
const DEFAULT_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(60);
// - tracing info fetch timeout is 15 millis
const DEFAULT_TRACING_INFO_FETCH_TIMEOUT: Duration = Duration::from_millis(15);
// - tracing info fetch interval is 3 millis
const DEFAULT_TRACING_INFO_FETCH_INTERVAL: Duration = Duration::from_millis(3);
// - tracing consistency is ONE
const DEFAULT_TRACING_CONSISTENCY: Consistency = Consistency::One;

const DRIVER_NAME: &str = "ScyllaDB Cpp-Rust Driver";
const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION");
Expand Down Expand Up @@ -109,6 +116,11 @@ pub struct CassCluster {
auth_password: Option<String>,

client_id: Option<uuid::Uuid>,

/// The default timeout for tracing info fetch.
/// Rust-driver only defines the number of retries.
/// However, this can be easily computed: `tracing_max_wait_time / tracing_retry_wait_time`.
tracing_max_wait_time: Duration,
}

impl CassCluster {
Expand Down Expand Up @@ -155,6 +167,19 @@ pub fn build_session_builder(
session_builder = session_builder.user(username, password)
}

// Compute the number of retries for tracing info fetch
// based on the timeout and interval provided by user.
let tracing_info_fetch_attemps = {
let attemps = cluster.tracing_max_wait_time.as_millis()
/ session_builder
.config
.tracing_info_fetch_interval
.as_millis();

NonZeroU32::new(attemps as u32).unwrap_or_else(|| NonZeroU32::new(1).unwrap())
};
session_builder = session_builder.tracing_info_fetch_attempts(tracing_info_fetch_attemps);

async move {
let load_balancing = load_balancing_config.clone().build().await;
execution_profile_builder = execution_profile_builder.load_balancing_policy(load_balancing);
Expand Down Expand Up @@ -183,6 +208,8 @@ pub unsafe extern "C" fn cass_cluster_new() -> *mut CassCluster {
.connection_timeout(DEFAULT_CONNECT_TIMEOUT)
.keepalive_interval(DEFAULT_KEEPALIVE_INTERVAL)
.keepalive_timeout(DEFAULT_KEEPALIVE_TIMEOUT)
.tracing_info_fetch_interval(DEFAULT_TRACING_INFO_FETCH_INTERVAL)
.tracing_info_fetch_consistency(DEFAULT_TRACING_CONSISTENCY)
};

Box::into_raw(Box::new(CassCluster {
Expand All @@ -198,6 +225,7 @@ pub unsafe extern "C" fn cass_cluster_new() -> *mut CassCluster {
execution_profile_map: Default::default(),
load_balancing_config: Default::default(),
client_id: None,
tracing_max_wait_time: DEFAULT_TRACING_INFO_FETCH_TIMEOUT,
}))
}

Expand Down Expand Up @@ -408,6 +436,43 @@ pub unsafe extern "C" fn cass_cluster_set_request_timeout(
})
}

#[no_mangle]
pub unsafe extern "C" fn cass_cluster_set_tracing_max_wait_time(
cluster_raw: *mut CassCluster,
max_wait_time_ms: c_uint,
) {
let cluster = ptr_to_ref_mut(cluster_raw);

cluster.tracing_max_wait_time = Duration::from_millis(max_wait_time_ms.into());
}

#[no_mangle]
pub unsafe extern "C" fn cass_cluster_set_tracing_retry_wait_time(
cluster_raw: *mut CassCluster,
retry_wait_time_ms: c_uint,
) {
let cluster = ptr_to_ref_mut(cluster_raw);

cluster.session_builder.config.tracing_info_fetch_interval =
Duration::from_millis(retry_wait_time_ms.into());
}

#[no_mangle]
pub unsafe extern "C" fn cass_cluster_set_tracing_consistency(
cluster_raw: *mut CassCluster,
consistency: CassConsistency,
) {
let cluster = ptr_to_ref_mut(cluster_raw);

let consistency = Consistency::try_from(consistency)
.expect("Invalid consistency passed to `cass_cluster_set_tracing_consistency`.");

cluster
.session_builder
.config
.tracing_info_fetch_consistency = consistency;
}

#[no_mangle]
pub unsafe extern "C" fn cass_cluster_set_port(
cluster_raw: *mut CassCluster,
Expand Down