From 74ad5d3d081491603d24a46a8583b4c9e420d427 Mon Sep 17 00:00:00 2001 From: Sonic Build Admin Date: Wed, 10 Sep 2025 08:09:44 +0000 Subject: [PATCH] Move swss-common/swss-common-testing crates from https://github.com/sonic-net/sonic-dash-ha Move swss-common/swss-common-testing crates from https://github.com/sonic-net/sonic-dash-ha After this PR, remove code in https://github.com/sonic-net/sonic-dash-ha/pull/86 --- .azure-pipelines/build-template.yml | 12 +- Cargo.toml | 52 +++ crates/swss-common-testing/Cargo.toml | 17 + crates/swss-common-testing/src/lib.rs | 268 ++++++++++++++ crates/swss-common/Cargo.toml | 31 ++ crates/swss-common/build.rs | 54 +++ crates/swss-common/src/lib.rs | 27 ++ crates/swss-common/src/types.rs | 314 +++++++++++++++++ crates/swss-common/src/types/async_util.rs | 46 +++ .../src/types/consumerstatetable.rs | 82 +++++ crates/swss-common/src/types/cxxstring.rs | 328 ++++++++++++++++++ crates/swss-common/src/types/dbconnector.rs | 249 +++++++++++++ crates/swss-common/src/types/exception.rs | 79 +++++ crates/swss-common/src/types/logger.rs | 92 +++++ .../src/types/producerstatetable.rs | 78 +++++ .../src/types/subscriberstatetable.rs | 85 +++++ crates/swss-common/src/types/table.rs | 119 +++++++ crates/swss-common/src/types/zmqclient.rs | 61 ++++ .../src/types/zmqconsumerstatetable.rs | 92 +++++ .../src/types/zmqproducerstatetable.rs | 67 ++++ crates/swss-common/src/types/zmqserver.rs | 39 +++ crates/swss-common/tests/async.rs | 220 ++++++++++++ crates/swss-common/tests/logger.rs | 49 +++ crates/swss-common/tests/logger_fallback.rs | 22 ++ crates/swss-common/tests/sync.rs | 223 ++++++++++++ debian/rules | 8 + 26 files changed, 2710 insertions(+), 4 deletions(-) create mode 100644 Cargo.toml create mode 100644 crates/swss-common-testing/Cargo.toml create mode 100644 crates/swss-common-testing/src/lib.rs create mode 100644 crates/swss-common/Cargo.toml create mode 100644 crates/swss-common/build.rs create mode 100644 crates/swss-common/src/lib.rs create mode 100644 crates/swss-common/src/types.rs create mode 100644 crates/swss-common/src/types/async_util.rs create mode 100644 crates/swss-common/src/types/consumerstatetable.rs create mode 100644 crates/swss-common/src/types/cxxstring.rs create mode 100644 crates/swss-common/src/types/dbconnector.rs create mode 100644 crates/swss-common/src/types/exception.rs create mode 100644 crates/swss-common/src/types/logger.rs create mode 100644 crates/swss-common/src/types/producerstatetable.rs create mode 100644 crates/swss-common/src/types/subscriberstatetable.rs create mode 100644 crates/swss-common/src/types/table.rs create mode 100644 crates/swss-common/src/types/zmqclient.rs create mode 100644 crates/swss-common/src/types/zmqconsumerstatetable.rs create mode 100644 crates/swss-common/src/types/zmqproducerstatetable.rs create mode 100644 crates/swss-common/src/types/zmqserver.rs create mode 100644 crates/swss-common/tests/async.rs create mode 100644 crates/swss-common/tests/logger.rs create mode 100644 crates/swss-common/tests/logger_fallback.rs create mode 100644 crates/swss-common/tests/sync.rs diff --git a/.azure-pipelines/build-template.yml b/.azure-pipelines/build-template.yml index b2eff50..e4d6b71 100644 --- a/.azure-pipelines/build-template.yml +++ b/.azure-pipelines/build-template.yml @@ -102,6 +102,14 @@ jobs: set -ex sudo pip install Pympler==0.8 pytest sudo apt-get install -y redis-server + sudo dpkg -i libswsscommon_*.deb + sudo dpkg -i libswsscommon-dev_*.deb + sudo dpkg -i python3-swsscommon_*.deb + + ps aux + cargo test --workspace --all-features + cargo test --release --workspace --all-features + sudo sed -i 's/notify-keyspace-events ""/notify-keyspace-events AKE/' /etc/redis/redis.conf sudo sed -ri 's/^# unixsocket/unixsocket/' /etc/redis/redis.conf sudo sed -ri 's/^unixsocketperm .../unixsocketperm 777/' /etc/redis/redis.conf @@ -109,10 +117,6 @@ jobs: sudo service redis-server restart sudo mkdir /usr/local/yang-models - sudo dpkg -i libswsscommon_*.deb - sudo dpkg -i libswsscommon-dev_*.deb - sudo dpkg -i python3-swsscommon_*.deb - ./tests/tests redis-cli FLUSHALL pytest-3 --cov=. --cov-report=xml diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..87a8614 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,52 @@ +[workspace] +resolver = '2' +members = [ + "crates/swss-common", + "crates/swss-common-testing", +] +exclude = [] + +[workspace.package] +version = "0.1.0" +authors = ["SONiC"] +license = "Apache-2.0" +repository = "https://github.com/sonic/sonic-swss-common" +documentation = "https://github.com/sonic-net/sonic-swss-common/blob/master/README.md" +keywords = ["sonic", "swss-common"] +edition = "2021" + +[workspace.lints.rust] +unused_extern_crates = 'warn' +trivial_numeric_casts = 'warn' +unstable_features = 'warn' +unused_import_braces = 'warn' + +[workspace.dependencies] +# Async framework +tokio = { version = "1.37", features = ["macros", "rt-multi-thread", "signal"] } +tokio-util = { version = "0.7", features = ["rt"] } +tokio-stream = "0.1" + +# Log and error handling +tracing = { version = "0.1", features = ["log"] } +tracing-error = "0.2" +tracing-subscriber = { version = "0.3", features = ["env-filter", "serde"] } +syslog-tracing = "0.3" +thiserror = "1" +anyhow = "1" +human-panic = "2" +better-panic = "0.3" +signal-hook = "0.3" + +# Serialization +serde = { version = "1", features = ["derive", "rc"] } +serde_json = "1" +serde_yaml = "0.9" +serde_with = "3.12" + +getset = "0.1" +lazy_static = "1.4" + +# Internal dependencies +swss-common = { version = "0.1.0", path = "crates/swss-common" } +swss-common-testing = { version = "0.1.0", path = "crates/swss-common-testing" } diff --git a/crates/swss-common-testing/Cargo.toml b/crates/swss-common-testing/Cargo.toml new file mode 100644 index 0000000..8525979 --- /dev/null +++ b/crates/swss-common-testing/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "swss-common-testing" +version.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +documentation.workspace = true +keywords.workspace = true +edition.workspace = true + +[dependencies] +swss-common = { path = "../swss-common" } +rand = "0.8.5" +lazy_static.workspace = true + +[lints] +workspace = true diff --git a/crates/swss-common-testing/src/lib.rs b/crates/swss-common-testing/src/lib.rs new file mode 100644 index 0000000..7b9c587 --- /dev/null +++ b/crates/swss-common-testing/src/lib.rs @@ -0,0 +1,268 @@ +use std::{ + collections::HashMap, + fs::{self, remove_file}, + io::{BufRead, BufReader}, + iter, + process::{Child, Command, Stdio}, + sync::Mutex, +}; + +use lazy_static::lazy_static; +use std::sync::Arc; + +use rand::{random, Rng}; + +use swss_common::*; + +lazy_static! { + static ref CONFIG_DB: Mutex>> = Mutex::new(None); +} + +pub struct Redis { + pub proc: Child, + pub sock: String, +} + +impl Redis { + /// Start a Redis instance with a random unix socket. Multiple instances can be started. + /// It is mutually exclusive with start_config_db(). + pub fn start() -> Self { + sonic_db_config_init_for_test(); + Redis::start_with_sock(random_unix_sock()) + } + + /// Start a Redis with config_db. Only one instance can be started at a time. + /// It is mutually exclusive with start(). + pub fn start_config_db() -> Arc { + CONFIG_DB + .lock() + .unwrap() + .get_or_insert_with(|| { + let sock_str = random_unix_sock(); + config_db_config_init_for_test(&sock_str); + Arc::new(Redis::start_with_sock(sock_str)) + }) + .clone() + } + + fn start_with_sock(sock: String) -> Self { + #[rustfmt::skip] + #[allow(clippy::zombie_processes)] + let mut child = Command::new("timeout") + .args([ + "--signal=KILL", + "15s", + "redis-server", + "--appendonly", "no", + "--save", "", + "--notify-keyspace-events", "AKE", + "--port", "0", + "--unixsocket", &sock, + ]) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let mut stdout = BufReader::new(child.stdout.take().unwrap()); + let mut buf = String::new(); + loop { + buf.clear(); + if stdout.read_line(&mut buf).unwrap() == 0 { + panic!("Redis didn't start"); + } + // Some redis version capitalize "Ready", others have lowercase "ready" :P + if buf.contains("eady to accept connections") { + break Self { proc: child, sock }; + } + } + } + + pub fn db_connector(&self) -> DbConnector { + DbConnector::new_unix(0, &self.sock, 0).unwrap() + } +} + +impl Drop for Redis { + fn drop(&mut self) { + Command::new("kill") + .args(["-s", "TERM", &self.proc.id().to_string()]) + .status() + .unwrap(); + self.proc.wait().unwrap(); + } +} + +pub struct Defer(Option); + +impl Drop for Defer { + fn drop(&mut self) { + self.0.take().unwrap()() + } +} + +const DB_CONFIG_JSON: &str = r#" + { + "DATABASES": { + "db name doesn't matter": { + "id": 0, + "separator": ":", + "instance": "redis" + } + } + } + "#; + +const CONFIG_DB_REDIS_CONFIG_JSON: &str = r#" + { + "INSTANCES": { + "redis": { + "hostname": "127.0.0.1", + "port": {port}, + "unix_socket_path": "{path}", + "persistence_for_warm_boot": "yes" + } + }, + "DATABASES": { + "APPL_DB": { + "id": 0, + "separator": ":", + "instance": "redis" + }, + "CONFIG_DB": { + "id": 1, + "separator": "|", + "instance": "redis" + }, + "STATE_DB": { + "id": 2, + "separator": "|", + "instance": "redis" + }, + "DPU_STATE_DB": { + "id": 3, + "separator": "|", + "instance": "redis" + }, + "DPU_APPL_DB": { + "id": 4, + "separator": ":", + "instance": "redis" + } + } + } +"#; + +const DB_GLOBAL_CONFIG_JSON: &str = r#" + { + "INCLUDES" : [ + { + "include" : "db_config_test.json" + }, + { + "container_name" : "dpu0", + "include" : "db_config_test.json" + + } + ] + } +"#; + +static SONIC_DB_INITIALIZED: Mutex = Mutex::new(false); +static CONIFG_DB_INITIALIZED: Mutex = Mutex::new(false); + +pub fn sonic_db_config_init_for_test() { + // HACK + // We need to do our own locking here because locking is not correctly implemented in + // swss::SonicDBConfig :/ + let config_db_init = CONIFG_DB_INITIALIZED.lock().unwrap(); + // config_db and sonic_db are mutually exclusive because the lock in swss::SonicDBConfig + assert!(!*config_db_init); + + let mut sonic_db_init = SONIC_DB_INITIALIZED.lock().unwrap(); + if !*sonic_db_init { + fs::write("/tmp/db_config_test.json", DB_CONFIG_JSON).unwrap(); + fs::write("/tmp/db_global_config_test.json", DB_GLOBAL_CONFIG_JSON).unwrap(); + sonic_db_config_initialize("/tmp/db_config_test.json").unwrap(); + sonic_db_config_initialize_global("/tmp/db_global_config_test.json").unwrap(); + fs::remove_file("/tmp/db_config_test.json").unwrap(); + fs::remove_file("/tmp/db_global_config_test.json").unwrap(); + *sonic_db_init = true; + } +} + +fn config_db_config_init_for_test(sock_str: &str) { + // HACK + // We need to do our own locking here because locking is not correctly implemented in + // swss::SonicDBConfig :/ + let sonic_db_init = SONIC_DB_INITIALIZED.lock().unwrap(); + // config_db and sonic_db are mutually exclusive because the lock in swss::SonicDBConfig + assert!(!*sonic_db_init); + + let mut config_db_init = CONIFG_DB_INITIALIZED.lock().unwrap(); + if !*config_db_init { + let port = random_port(); + let db_config_json = CONFIG_DB_REDIS_CONFIG_JSON + .replace("{port}", &port.to_string()) + .replace("{path}", sock_str); + fs::write("/tmp/db_config_test.json", db_config_json).unwrap(); + fs::write("/tmp/db_global_config_test.json", DB_GLOBAL_CONFIG_JSON).unwrap(); + sonic_db_config_initialize("/tmp/db_config_test.json").unwrap(); + sonic_db_config_initialize_global("/tmp/db_global_config_test.json").unwrap(); + fs::remove_file("/tmp/db_config_test.json").unwrap(); + fs::remove_file("/tmp/db_global_config_test.json").unwrap(); + *config_db_init = true; + } +} + +pub fn random_string() -> String { + format!("{:0X}", random::()) +} + +pub fn random_cxx_string() -> CxxString { + CxxString::new(random_string()) +} + +pub fn random_fvs() -> FieldValues { + let mut field_values = HashMap::new(); + for _ in 0..rand::thread_rng().gen_range(100..1000) { + field_values.insert(random_string(), random_cxx_string()); + } + field_values +} + +pub fn random_kfv() -> KeyOpFieldValues { + let key = random_string(); + let operation = if random() { KeyOperation::Set } else { KeyOperation::Del }; + let field_values = if operation == KeyOperation::Set { + // We need at least one field-value pair, otherwise swss::BinarySerializer infers that + // the operation is DEL even if the .operation field is SET + random_fvs() + } else { + HashMap::new() + }; + + KeyOpFieldValues { + key, + operation, + field_values, + } +} + +pub fn random_kfvs() -> Vec { + iter::repeat_with(random_kfv).take(100).collect() +} + +pub fn random_unix_sock() -> String { + format!("/tmp/swss-common-testing-{}.sock", random_string()) +} + +pub fn random_port() -> u16 { + let mut rng = rand::thread_rng(); + rng.gen_range(1000..65535) +} + +// zmq doesn't clean up its own ipc sockets, so we include a deferred operation for that +pub fn random_zmq_endpoint() -> (String, impl Drop) { + let sock = random_unix_sock(); + let endpoint = format!("ipc://{sock}"); + (endpoint, Defer(Some(|| remove_file(sock).unwrap()))) +} diff --git a/crates/swss-common/Cargo.toml b/crates/swss-common/Cargo.toml new file mode 100644 index 0000000..bfe17f2 --- /dev/null +++ b/crates/swss-common/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "swss-common" +version.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +documentation.workspace = true +keywords.workspace = true +edition.workspace = true + +[lints] +workspace = true + +[features] +async = ["dep:tokio"] + +[dependencies] +libc = "0.2.158" +tokio = { version = "1", optional = true, features = ["net", "rt"] } +serde.workspace = true +getset.workspace = true +lazy_static.workspace = true +tracing-subscriber.workspace = true + +[build-dependencies] +bindgen = "0.70.1" + +[dev-dependencies] +swss-common-testing = { path = "../swss-common-testing" } +paste = "1.0.15" +tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] } diff --git a/crates/swss-common/build.rs b/crates/swss-common/build.rs new file mode 100644 index 0000000..e89a3bb --- /dev/null +++ b/crates/swss-common/build.rs @@ -0,0 +1,54 @@ +use std::{ + env, fs, + path::{Path, PathBuf}, +}; + +fn main() { + let header_paths = match env::var_os("SWSS_COMMON_REPO") { + // if SWSS_COMMON_REPO is specified, we will build this library based on the files found in the repo + Some(path_string) => { + // Get full path of repo + let path = Path::new(&path_string); + let path = fs::canonicalize(path).expect("canonicalizing SWSS_COMMON_REPO path"); + assert!(path.exists(), "{path:?} doesn't exist"); + + // Add libs path to linker search + let libs_path = path.join("common/.libs").to_str().unwrap().to_string(); + println!("cargo:rustc-link-search=native={libs_path}"); + + // include all of common/c-api/*.h + find_headers_in_dir(path.join("common/c-api")) + } + + // otherwise, we will assume the libswsscommon-dev package is installed and files + // will be found at the locations defined in /debian/libswsscommon-dev.install + None => { + eprintln!("NOTE: If you are compiling without installing libswsscommon-dev, you must set SWSS_COMMON_REPO to the path of a built copy of the sonic-swss-common repo"); + find_headers_in_dir("/usr/include/swss/c-api") + } + }; + + for h in &header_paths { + println!("cargo:rerun-if-changed={h}"); + } + + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("bindings.rs"); + bindgen::builder() + .headers(&header_paths) + .derive_partialeq(true) + .generate() + .unwrap() + .write_to_file(out_path) + .unwrap(); + + println!("cargo:rustc-link-lib=dylib=swsscommon"); +} + +fn find_headers_in_dir(path: impl AsRef) -> Vec { + fs::read_dir(path) + .unwrap() + .map(|res_entry| res_entry.unwrap().path()) + .filter(|p| p.extension().is_some_and(|ext| ext == "h")) + .map(|p| p.to_str().unwrap().to_string()) + .collect::>() +} diff --git a/crates/swss-common/src/lib.rs b/crates/swss-common/src/lib.rs new file mode 100644 index 0000000..4082429 --- /dev/null +++ b/crates/swss-common/src/lib.rs @@ -0,0 +1,27 @@ +mod bindings { + #![allow(unused, non_snake_case, non_upper_case_globals, non_camel_case_types)] + include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +} +mod types; + +pub use types::*; + +/// Rust wrapper around `swss::SonicDBConfig::initialize`. +pub fn sonic_db_config_initialize(path: &str) -> Result<(), Exception> { + let path = cstr(path); + unsafe { swss_try!(bindings::SWSSSonicDBConfig_initialize(path.as_ptr())) } +} + +/// Rust wrapper around `swss::SonicDBConfig::initializeGlobalConfig`. +pub fn sonic_db_config_initialize_global(path: &str) -> Result<(), Exception> { + let path = cstr(path); + unsafe { swss_try!(bindings::SWSSSonicDBConfig_initializeGlobalConfig(path.as_ptr())) } +} + +/// Trait for objects that can be stored in a Sonic DB table. +pub trait SonicDbTable { + fn key_separator() -> char; + fn table_name() -> &'static str; + fn db_name() -> &'static str; + fn is_dpu() -> bool; +} diff --git a/crates/swss-common/src/types.rs b/crates/swss-common/src/types.rs new file mode 100644 index 0000000..0bc286c --- /dev/null +++ b/crates/swss-common/src/types.rs @@ -0,0 +1,314 @@ +#[cfg(feature = "async")] +mod async_util; + +mod consumerstatetable; +mod cxxstring; +mod dbconnector; +mod exception; +mod logger; +mod producerstatetable; +mod subscriberstatetable; +mod table; +mod zmqclient; +mod zmqconsumerstatetable; +mod zmqproducerstatetable; +mod zmqserver; + +pub use consumerstatetable::ConsumerStateTable; +pub use cxxstring::{CxxStr, CxxString}; +pub use dbconnector::{DbConnectionInfo, DbConnector}; +pub use exception::{Exception, Result}; +pub use logger::{link_to_swsscommon_logger, log_level, log_output, LoggerConfigChangeHandler}; +pub use producerstatetable::ProducerStateTable; +pub use subscriberstatetable::SubscriberStateTable; +pub use table::Table; +pub use zmqclient::ZmqClient; +pub use zmqconsumerstatetable::ZmqConsumerStateTable; +pub use zmqproducerstatetable::ZmqProducerStateTable; +pub use zmqserver::ZmqServer; + +pub(crate) use exception::swss_try; + +use crate::bindings::*; +use cxxstring::RawMutableSWSSString; +use serde::{Deserialize, Serialize}; +use std::{ + any::Any, + collections::HashMap, + error::Error, + ffi::{CStr, CString}, + fmt::Display, + slice, + str::FromStr, +}; + +pub(crate) fn cstr(s: impl AsRef<[u8]>) -> CString { + CString::new(s.as_ref()).expect("Bytes being converted to a C string already contains a null byte") +} + +/// Take a malloc'd c string and convert it to a native String +pub(crate) unsafe fn take_cstr(p: *const libc::c_char) -> String { + let s = CStr::from_ptr(p) + .to_str() + .expect("C string being converted to Rust String contains invalid UTF-8") + .to_string(); + libc::free(p as *mut libc::c_void); + s +} + +/// Rust version of the return type from `swss::Select::select`. +/// +/// This enum does not include the `swss::Select::ERROR` because errors are handled via a different +/// mechanism in this library. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum SelectResult { + /// Data is now available. + /// (`swss::Select::OBJECT`) + Data, + /// Waiting was interrupted by a signal. + /// (`swss::Select::SIGNALINT`) + Signal, + /// Timed out. + /// (`swss::Select::TIMEOUT`) + Timeout, +} + +impl SelectResult { + pub(crate) fn from_raw(raw: SWSSSelectResult) -> Self { + if raw == SWSSSelectResult_SWSSSelectResult_DATA { + SelectResult::Data + } else if raw == SWSSSelectResult_SWSSSelectResult_SIGNAL { + SelectResult::Signal + } else if raw == SWSSSelectResult_SWSSSelectResult_TIMEOUT { + SelectResult::Timeout + } else { + unreachable!("Invalid SWSSSelectResult: {raw}"); + } + } +} + +/// Type of the `operation` field in [KeyOpFieldValues]. +/// +/// In swsscommon, this is represented as a string of `"SET"` or `"DEL"`. +/// This type can be constructed similarly - `let op: KeyOperation = "SET".parse().unwrap()`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub enum KeyOperation { + Set, + Del, +} + +impl KeyOperation { + pub(crate) fn as_raw(self) -> SWSSKeyOperation { + match self { + KeyOperation::Set => SWSSKeyOperation_SWSSKeyOperation_SET, + KeyOperation::Del => SWSSKeyOperation_SWSSKeyOperation_DEL, + } + } + + pub(crate) fn from_raw(raw: SWSSKeyOperation) -> Self { + if raw == SWSSKeyOperation_SWSSKeyOperation_SET { + KeyOperation::Set + } else if raw == SWSSKeyOperation_SWSSKeyOperation_DEL { + KeyOperation::Del + } else { + unreachable!("Invalid SWSSKeyOperation: {raw}"); + } + } +} + +impl FromStr for KeyOperation { + type Err = InvalidKeyOperationString; + + /// Create a KeyOperation from `"SET"` or `"DEL"` (case insensitive). + fn from_str(s: &str) -> Result { + let Ok(mut bytes): Result<[u8; 3], _> = s.as_bytes().try_into() else { + return Err(InvalidKeyOperationString(s.to_string())); + }; + bytes.make_ascii_uppercase(); + match &bytes { + b"SET" => Ok(Self::Set), + b"DEL" => Ok(Self::Del), + _ => Err(InvalidKeyOperationString(s.to_string())), + } + } +} + +/// Error type indicating that a `KeyOperation` string was neither `"SET"` nor `"DEL"`. +#[derive(Debug)] +pub struct InvalidKeyOperationString(String); + +impl Display for InvalidKeyOperationString { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, r#"A KeyOperation String must be "SET" or "DEL", but was {}"#, self.0) + } +} + +impl Error for InvalidKeyOperationString {} + +/// Rust version of `vector`. +pub type FieldValues = HashMap; + +/// Rust version of `swss::KeyOpFieldsValuesTuple`. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct KeyOpFieldValues { + pub key: String, + pub operation: KeyOperation, + pub field_values: FieldValues, +} + +impl KeyOpFieldValues { + pub fn set(key: K, fvs: I) -> Self + where + K: Into, + I: IntoIterator, + F: Into, + V: Into, + { + Self { + key: key.into(), + operation: KeyOperation::Set, + field_values: fvs.into_iter().map(|(f, v)| (f.into(), v.into())).collect(), + } + } + + pub fn del(key: K) -> Self + where + K: Into, + { + Self { + key: key.into(), + operation: KeyOperation::Del, + field_values: HashMap::new(), + } + } +} + +/// Intended for testing, ordered by key. +impl PartialOrd for KeyOpFieldValues { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Intended for testing, ordered by key. +impl Ord for KeyOpFieldValues { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.key.cmp(&other.key) + } +} + +/// Takes ownership of an `SWSSFieldValueArray` and turns it into a native representation. +pub(crate) unsafe fn take_field_value_array(arr: SWSSFieldValueArray) -> FieldValues { + let mut out = HashMap::with_capacity(arr.len as usize); + if !arr.data.is_null() { + let entries = slice::from_raw_parts(arr.data, arr.len as usize); + for fv in entries { + let field = take_cstr(fv.field); + let value = CxxString::take(fv.value).unwrap(); + out.insert(field, value); + } + SWSSFieldValueArray_free(arr); + } + out +} + +/// Takes ownership of an `SWSSKeyOpFieldValuesArray` and turns it into a native representation. +pub(crate) unsafe fn take_key_op_field_values_array(kfvs: SWSSKeyOpFieldValuesArray) -> Vec { + let mut out = Vec::with_capacity(kfvs.len as usize); + if !kfvs.data.is_null() { + unsafe { + let entries = slice::from_raw_parts(kfvs.data, kfvs.len as usize); + for kfv in entries { + let key = take_cstr(kfv.key); + let operation = KeyOperation::from_raw(kfv.operation); + let field_values = take_field_value_array(kfv.fieldValues); + out.push(KeyOpFieldValues { + key, + operation, + field_values, + }); + } + SWSSKeyOpFieldValuesArray_free(kfvs); + }; + } + out +} + +/// Takes ownership of an `SWSSStringArray` and turns it into a native representation. +pub(crate) unsafe fn take_string_array(arr: SWSSStringArray) -> Vec { + let out = if !arr.data.is_null() { + let entries = slice::from_raw_parts(arr.data, arr.len as usize); + Vec::from_iter(entries.iter().map(|&s| take_cstr(s))) + } else { + Vec::new() + }; + SWSSStringArray_free(arr); + out +} + +pub(crate) fn make_field_value_array(fvs: I) -> (SWSSFieldValueArray, KeepAlive) +where + I: IntoIterator, + F: AsRef<[u8]>, + V: Into, +{ + let mut k = KeepAlive::default(); + let mut data = Vec::new(); + + for (field, value) in fvs { + let field = cstr(field); + let value_cxxstring: CxxString = value.into(); + let value_rawswssstring: RawMutableSWSSString = value_cxxstring.into_raw(); + data.push(SWSSFieldValueTuple { + field: field.as_ptr(), + value: value_rawswssstring.as_raw(), + }); + k.keep((field, value_rawswssstring)); + } + + let arr = SWSSFieldValueArray { + data: data.as_mut_ptr(), + len: data.len().try_into().unwrap(), + }; + k.keep(data); + + (arr, k) +} + +pub(crate) fn make_key_op_field_values_array(kfvs: I) -> (SWSSKeyOpFieldValuesArray, KeepAlive) +where + I: IntoIterator, +{ + let mut k = KeepAlive::default(); + let mut data = Vec::new(); + + for kfv in kfvs { + let key = cstr(kfv.key); + let operation = kfv.operation.as_raw(); + let (field_values, arr_k) = make_field_value_array(kfv.field_values); + data.push(SWSSKeyOpFieldValues { + key: key.as_ptr(), + operation, + fieldValues: field_values, + }); + k.keep(Box::new((key, arr_k))) + } + + let arr = SWSSKeyOpFieldValuesArray { + data: data.as_mut_ptr(), + len: data.len().try_into().unwrap(), + }; + k.keep(Box::new(data)); + + (arr, k) +} + +/// Helper struct to keep rust-owned data alive while it is in use by C++ +#[derive(Default)] +pub(crate) struct KeepAlive(Vec>); + +impl KeepAlive { + fn keep(&mut self, t: T) { + self.0.push(Box::new(t)) + } +} diff --git a/crates/swss-common/src/types/async_util.rs b/crates/swss-common/src/types/async_util.rs new file mode 100644 index 0000000..154e115 --- /dev/null +++ b/crates/swss-common/src/types/async_util.rs @@ -0,0 +1,46 @@ +/// Take a blocking closure and run it to completion in a [`tokio::task::spawn_blocking`] thread. +pub(crate) async fn spawn_blocking_scoped T + Send, T: Send + 'static>(f: F) -> T { + let clos: Box T + Send> = Box::new(f); + // SAFETY: We are joining the spawned task before the current lifetime ends, so it is sound to + // access any data borrowed from this function in the spawned task. However, I acknowledge that + // extending lifetimes with transmute is terrible. Hail Mary. + let clos_static: Box T + Send + 'static> = unsafe { std::mem::transmute(clos) }; + ::tokio::task::spawn_blocking(clos_static).await.unwrap() +} + +/// These are "basic" because they just use spawn_blocking instead of a more specialized implementation. +/// See [`tokio::task::spawn_blocking`]. +macro_rules! impl_basic_async_method { + // Method (with self) + ($async_fn:ident <= $sync_fn:ident $(<$($generic_decls:tt),*>)? (& $(mut)? self $(, $arg:ident : $typ:ty)*) $(-> $ret_ty:ty)? $(where $($generic_bounds:tt)*)?) => { + #[doc = concat!("Async version of [`", stringify!($sync_fn), "`](Self::", stringify!($sync_fn), ") that uses `tokio::task::spawn_blocking`.")] + pub async fn $async_fn $(<$($generic_decls),*>)? (&mut self $(, $arg: $typ)*) $(-> $ret_ty)? $(where $($generic_bounds)*)? { + $crate::types::async_util::spawn_blocking_scoped(move || self.$sync_fn($($arg),*)).await + } + }; + + // Associated fn (without self) + ($async_fn:ident <= $sync_fn:ident $(<$($generic_decls:tt),*>)? ($($arg:ident : $typ:ty),*) $(-> $ret_ty:ty)? $(where $($generic_bounds:tt)*)?) => { + #[doc = concat!("Async version of [`", stringify!($sync_fn), "`](Self::", stringify!($sync_fn), ") that uses `tokio::task::spawn_blocking`.")] + pub async fn $async_fn $(<$($generic_decls),*>)? ($($arg: $typ),*) $(-> $ret_ty)? $(where $($generic_bounds)*)? { + $crate::types::async_util::spawn_blocking_scoped(move || Self::$sync_fn($($arg),*)).await + } + }; +} +pub(crate) use impl_basic_async_method; + +macro_rules! impl_read_data_async { + () => { + /// Async version of [`read_data`](Self::read_data). Does not time out or interrupt on signal. + pub async fn read_data_async(&mut self) -> ::std::io::Result<()> { + use ::tokio::io::{unix::AsyncFd, Interest}; + + let fd = self.get_fd().map_err(::std::io::Error::other)?; + let _ready_guard = AsyncFd::with_interest(fd, Interest::READABLE)?.readable().await?; + self.read_data(Duration::from_secs(0), false) + .map_err(::std::io::Error::other)?; + Ok(()) + } + }; +} +pub(crate) use impl_read_data_async; diff --git a/crates/swss-common/src/types/consumerstatetable.rs b/crates/swss-common/src/types/consumerstatetable.rs new file mode 100644 index 0000000..7789b72 --- /dev/null +++ b/crates/swss-common/src/types/consumerstatetable.rs @@ -0,0 +1,82 @@ +use super::*; +use crate::bindings::*; +use std::{os::fd::BorrowedFd, ptr::null, time::Duration}; + +/// Rust wrapper around `swss::ConsumerStateTable`. +#[derive(Debug)] +pub struct ConsumerStateTable { + ptr: SWSSConsumerStateTable, + db: DbConnector, + table_name: String, +} + +impl ConsumerStateTable { + pub fn new(db: DbConnector, table_name: &str, pop_batch_size: Option, pri: Option) -> Result { + let table_name_string = String::from(table_name); + let table_name = cstr(table_name); + let pop_batch_size = pop_batch_size.as_ref().map(|n| n as *const i32).unwrap_or(null()); + let pri = pri.as_ref().map(|n| n as *const i32).unwrap_or(null()); + let ptr = unsafe { + swss_try!(p_cst => SWSSConsumerStateTable_new(db.ptr, table_name.as_ptr(), pop_batch_size, pri, p_cst))? + }; + Ok(Self { + ptr, + db, + table_name: table_name_string, + }) + } + + pub fn pops(&self) -> Result> { + unsafe { + let arr = swss_try!(p_arr => SWSSConsumerStateTable_pops(self.ptr, p_arr))?; + Ok(take_key_op_field_values_array(arr)) + } + } + + pub fn get_fd(&self) -> Result { + // SAFETY: This fd represents the underlying redis connection, which should stay alive + // as long as the DbConnector does. + unsafe { + let fd = swss_try!(p_fd => SWSSConsumerStateTable_getFd(self.ptr, p_fd))?; + let fd = BorrowedFd::borrow_raw(fd.try_into().unwrap()); + Ok(fd) + } + } + + pub fn read_data(&self, timeout: Duration, interrupt_on_signal: bool) -> Result { + let timeout_ms = timeout.as_millis().try_into().unwrap(); + let res = unsafe { + swss_try!(p_res => { + SWSSConsumerStateTable_readData(self.ptr, timeout_ms, interrupt_on_signal as u8, p_res) + })? + }; + Ok(SelectResult::from_raw(res)) + } + + pub fn db_connector(&self) -> &DbConnector { + &self.db + } + + pub fn db_connector_mut(&mut self) -> &mut DbConnector { + &mut self.db + } + + pub fn table_name(&self) -> &str { + &self.table_name + } +} + +impl Drop for ConsumerStateTable { + fn drop(&mut self) { + unsafe { swss_try!(SWSSConsumerStateTable_free(self.ptr)).expect("Dropping ConsumerStateTable") }; + } +} + +unsafe impl Send for ConsumerStateTable {} + +/// Async versions of methods +#[cfg(feature = "async")] +impl ConsumerStateTable { + async_util::impl_read_data_async!(); + async_util::impl_basic_async_method!(pops_async <= pops(&self) -> Result>); +} diff --git a/crates/swss-common/src/types/cxxstring.rs b/crates/swss-common/src/types/cxxstring.rs new file mode 100644 index 0000000..c9989e9 --- /dev/null +++ b/crates/swss-common/src/types/cxxstring.rs @@ -0,0 +1,328 @@ +use crate::bindings::*; +use serde::{Deserialize, Serialize}; +use std::{ + borrow::{Borrow, Cow}, + fmt::Debug, + hash::Hash, + ops::Deref, + ptr::NonNull, + slice, + str::Utf8Error, +}; + +/// A C++ `std::string` that can be moved around and accessed from Rust. +#[repr(transparent)] +#[derive(Eq)] +pub struct CxxString { + ptr: NonNull, +} + +impl CxxString { + /// Construct a CxxString from an SWSSString. + /// The `CxxString` is now responsible for freeing the string, so it should not be freed manually. + /// Returns `None` if `s` is null. + pub(crate) unsafe fn take(s: SWSSString) -> Option { + NonNull::new(s).map(|ptr| CxxString { ptr }) + } + + /// Convert `self` into a wrapper type which provides safe mutable access to the underlying `std::string`. + pub(crate) fn into_raw(self) -> RawMutableSWSSString { + RawMutableSWSSString(self) + } + + /// Copies the given data into a new C++ string. + pub fn new(data: impl AsRef<[u8]>) -> CxxString { + unsafe { + let ptr = data.as_ref().as_ptr() as *const libc::c_char; + let len = data.as_ref().len().try_into().unwrap(); + CxxString::take(SWSSString_new(ptr, len)).unwrap() + } + } + + /// Borrows a `CxxStr` from this string. + /// + /// Like `String::as_str`, this method is unnecessary where deref coercion can be used. + pub fn as_cxx_str(&self) -> &CxxStr { + self + } +} + +impl Serialize for CxxString { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.as_cxx_str().serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for CxxString { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct CxxStringVisitor; + + impl serde::de::Visitor<'_> for CxxStringVisitor { + type Value = CxxString; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("string or bytes") + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + Ok(CxxString::new(v)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + Ok(CxxString::new(v)) + } + } + + deserializer.deserialize_bytes(CxxStringVisitor) + } +} + +/// Wrapper type around `CxxString` which disables access to methods that borrow the underlying +/// data, like `.deref()`. This prevents code from creating an `SWSSString` and `SWSSStrRef` at the +/// same time, which would cause a data race in multi-threaded contexts. +/// +/// A similar struct could be made which would allow access via just an `&mut CxxString`, but any +/// function taking `SWSSString` will destroy the underlying data anyway, so we might as well just +/// drop it when we're done. +/// +/// This newtype needs to exist (as opposed to into_raw returning the raw pointer) so that the +/// string is not dropped immediately. +pub(crate) struct RawMutableSWSSString(CxxString); + +impl RawMutableSWSSString { + pub(crate) fn as_raw(&self) -> SWSSString { + self.0.ptr.as_ptr() + } +} + +impl> From for CxxString { + fn from(bytes: T) -> Self { + CxxString::new(bytes.as_ref()) + } +} + +impl Drop for CxxString { + fn drop(&mut self) { + unsafe { SWSSString_free(self.ptr.as_ptr()) } + } +} + +/// This calls [CxxStr::to_string_lossy] which may clone the underlying data (i.e. may be expensive). +impl Debug for CxxString { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.deref().fmt(f) + } +} + +impl Clone for CxxString { + fn clone(&self) -> Self { + CxxString::new(self.as_bytes()) + } +} + +/// SAFETY: A C++ string has no thread related state. +unsafe impl Send for CxxString {} + +/// SAFETY: A `CxxString` can only be mutated through `.into_raw()` which takes ownership of `self`. +/// Thus, normal Rust borrow checking rules will prevent data races. +unsafe impl Sync for CxxString {} + +impl Deref for CxxString { + type Target = CxxStr; + + fn deref(&self) -> &Self::Target { + // SAFETY: CxxString and CxxStr are both repr(transparent) and identical in alignment & + // size, and the C API guarantees that SWSSString can always be cast into SWSSStrRef + unsafe { std::mem::transmute(self) } + } +} + +impl PartialOrd for CxxString { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for CxxString { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.deref().cmp(other) + } +} + +impl PartialEq for CxxString { + fn eq(&self, other: &Self) -> bool { + self.deref().eq(other) + } +} + +impl PartialEq for CxxString { + fn eq(&self, other: &CxxStr) -> bool { + self.deref().eq(other) + } +} + +impl PartialEq for CxxString { + fn eq(&self, other: &str) -> bool { + self.deref().eq(other) + } +} + +impl PartialEq<&str> for CxxString { + fn eq(&self, other: &&str) -> bool { + self.deref().eq(other) + } +} + +impl PartialEq for CxxString { + fn eq(&self, other: &String) -> bool { + self.eq(other.as_str()) + } +} + +impl Hash for CxxString { + fn hash(&self, state: &mut H) { + self.deref().hash(state); + } +} + +impl Borrow for CxxString { + fn borrow(&self) -> &CxxStr { + self.deref() + } +} + +/// Equivalent of a C++ `const std::string&`, which can be borrowed from a [`CxxString`]. +/// +/// `CxxStr` has the same conceptual relationship with `CxxString` as a Rust `&str` does with `String`. +#[repr(transparent)] +#[derive(Eq)] +pub struct CxxStr { + ptr: NonNull, +} + +impl CxxStr { + /// This is safe because the C api guarantees that `SWSSStrRef` is read-only. On `CxxString`, this would not be safe. + pub(crate) fn as_raw(&self) -> SWSSStrRef { + self.ptr.as_ptr() + } + + /// Length of the string, not including a null terminator. + pub fn len(&self) -> usize { + unsafe { SWSSStrRef_length(self.as_raw()).try_into().unwrap() } + } + + /// Returns `true` if `self` has a length of zero bytes. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// The underlying bytes of the string, not including a null terminator. + pub fn as_bytes(&self) -> &[u8] { + unsafe { + let data = SWSSStrRef_c_str(self.as_raw()); + slice::from_raw_parts(data as *const u8, self.len()) + } + } + + /// Tries to convert the C++ string to a Rust `&str` without copying. This can only be done if + /// the string contains valid UTF-8. See [std::str::from_utf8]. + pub fn to_str(&self) -> Result<&str, Utf8Error> { + std::str::from_utf8(self.as_bytes()) + } + + /// Converts the C++ string to a Rust `&str` or `String`. If the string is valid UTF-8, the + /// result is a `&str` pointing to the original data. Otherwise, the result is a `String` with + /// a copy of the data, but with invalid UTF-8 replaced. See [String::from_utf8_lossy]. + pub fn to_string_lossy(&self) -> Cow<'_, str> { + String::from_utf8_lossy(self.as_bytes()) + } +} + +impl ToOwned for CxxStr { + type Owned = CxxString; + + fn to_owned(&self) -> Self::Owned { + CxxString::new(self.as_bytes()) + } +} + +impl Debug for CxxStr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "\"{}\"", self.to_string_lossy()) + } +} + +impl PartialOrd for CxxStr { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for CxxStr { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.as_bytes().cmp(other.as_bytes()) + } +} + +impl Hash for CxxStr { + fn hash(&self, state: &mut H) { + self.as_bytes().hash(state); + } +} + +impl PartialEq for CxxStr { + fn eq(&self, other: &Self) -> bool { + self.as_bytes().eq(other.as_bytes()) + } +} + +impl PartialEq for CxxStr { + fn eq(&self, other: &CxxString) -> bool { + self.as_bytes().eq(other.as_bytes()) + } +} + +impl PartialEq for CxxStr { + fn eq(&self, other: &str) -> bool { + self.as_bytes().eq(other.as_bytes()) + } +} + +impl PartialEq<&str> for CxxStr { + fn eq(&self, other: &&str) -> bool { + self.eq(*other) + } +} + +impl PartialEq for CxxStr { + fn eq(&self, other: &String) -> bool { + self.eq(other.as_str()) + } +} + +/// SAFETY: `CxxStr` can never be obtained by value, only as a reference by dereferencing a +/// `CxxString`. Thus, the same logic as implementing `Sync` for `CxxString` applies. Mutation +/// is protected by `CxxString::into_raw` requiring an owned `self`. By the same logic, `Send` is +/// unnecessary to implement (you can never obtain a value to send). +unsafe impl Sync for CxxStr {} + +impl Serialize for CxxStr { + fn serialize(&self, serializer: S) -> Result { + match self.to_str() { + Ok(s) => serializer.serialize_str(s), + Err(_) => serializer.serialize_bytes(self.as_bytes()), + } + } +} diff --git a/crates/swss-common/src/types/dbconnector.rs b/crates/swss-common/src/types/dbconnector.rs new file mode 100644 index 0000000..152aded --- /dev/null +++ b/crates/swss-common/src/types/dbconnector.rs @@ -0,0 +1,249 @@ +use super::*; +use crate::bindings::*; +use std::collections::HashMap; + +/// Rust wrapper around `swss::DBConnector`. +#[derive(Debug)] +pub struct DbConnector { + pub(crate) ptr: SWSSDBConnector, + connection: DbConnectionInfo, +} + +/// Details about how a DbConnector is connected to Redis +#[derive(Debug, Clone)] +pub enum DbConnectionInfo { + Tcp { + hostname: String, + port: u16, + db_id: i32, + }, + Unix { + sock_path: String, + db_id: i32, + }, + Named { + db_name: String, + is_tcp_conn: bool, + }, + Keyed { + db_name: String, + is_tcp_conn: bool, + container_name: String, + netns: String, + }, +} + +impl DbConnector { + /// Create a new DbConnector from [`DbConnectionInfo`]. + /// + /// Timeout of 0 means block indefinitely. + pub fn new(connection: DbConnectionInfo, timeout_ms: u32) -> Result { + let ptr = match &connection { + DbConnectionInfo::Tcp { hostname, port, db_id } => { + let hostname = cstr(hostname); + unsafe { + swss_try!(p_db => SWSSDBConnector_new_tcp(*db_id, hostname.as_ptr(), *port, timeout_ms, p_db))? + } + } + DbConnectionInfo::Unix { sock_path, db_id } => { + let sock_path = cstr(sock_path); + unsafe { swss_try!(p_db => SWSSDBConnector_new_unix(*db_id, sock_path.as_ptr(), timeout_ms, p_db))? } + } + DbConnectionInfo::Named { db_name, is_tcp_conn } => { + let db_name = cstr(db_name); + unsafe { + swss_try!(p_db => SWSSDBConnector_new_named(db_name.as_ptr(), timeout_ms, *is_tcp_conn as u8, p_db))? + } + } + DbConnectionInfo::Keyed { + db_name, + is_tcp_conn, + container_name, + netns, + } => { + let db_name = cstr(db_name); + let container_name = cstr(container_name); + let netns = cstr(netns); + unsafe { + swss_try!(p_db => SWSSDBConnector_new_keyed(db_name.as_ptr(), timeout_ms, *is_tcp_conn as u8, + container_name.as_ptr(), netns.as_ptr(), p_db))? + } + } + }; + + Ok(Self { ptr, connection }) + } + + /// Create a DbConnector from a named entry in the SONiC db config. + /// + /// Timeout of 0 means block indefinitely. + pub fn new_named(db_name: impl Into, is_tcp_conn: bool, timeout_ms: u32) -> Result { + let db_name = db_name.into(); + Self::new(DbConnectionInfo::Named { db_name, is_tcp_conn }, timeout_ms) + } + + /// Create a DbConnector over a tcp socket. + /// + /// Timeout of 0 means block indefinitely. + pub fn new_tcp(db_id: i32, hostname: impl Into, port: u16, timeout_ms: u32) -> Result { + let hostname = hostname.into(); + Self::new(DbConnectionInfo::Tcp { hostname, port, db_id }, timeout_ms) + } + + /// Create a DbConnector over a unix socket. + /// + /// Timeout of 0 means block indefinitely. + pub fn new_unix(db_id: i32, sock_path: impl Into, timeout_ms: u32) -> Result { + let sock_path = sock_path.into(); + Self::new(DbConnectionInfo::Unix { sock_path, db_id }, timeout_ms) + } + + pub fn new_keyed( + db_name: impl Into, + is_tcp_conn: bool, + timeout_ms: u32, + container_name: impl Into, + netns: impl Into, + ) -> Result { + let db_name = db_name.into(); + let container_name = container_name.into(); + let netns = netns.into(); + Self::new( + DbConnectionInfo::Keyed { + db_name, + is_tcp_conn, + container_name, + netns, + }, + timeout_ms, + ) + } + + /// Clone a DbConnector with a timeout. + /// + /// Timeout of 0 means block indefinitely. + pub fn clone_timeout(&self, timeout_ms: u32) -> Result { + Self::new(self.connection.clone(), timeout_ms) + } + + pub fn connection(&self) -> &DbConnectionInfo { + &self.connection + } + + pub fn del(&self, key: &str) -> Result { + let key = cstr(key); + let status = unsafe { swss_try!(p_status => SWSSDBConnector_del(self.ptr, key.as_ptr(), p_status))? }; + Ok(status == 1) + } + + pub fn set(&self, key: &str, val: &CxxStr) -> Result<()> { + let key = cstr(key); + unsafe { swss_try!(SWSSDBConnector_set(self.ptr, key.as_ptr(), val.as_raw())) } + } + + pub fn get(&self, key: &str) -> Result> { + let key = cstr(key); + unsafe { + let ans = swss_try!(p_ans => SWSSDBConnector_get(self.ptr, key.as_ptr(), p_ans))?; + Ok(CxxString::take(ans)) + } + } + + pub fn exists(&self, key: &str) -> Result { + let key = cstr(key); + let status = unsafe { swss_try!(p_status => SWSSDBConnector_exists(self.ptr, key.as_ptr(), p_status))? }; + Ok(status == 1) + } + + pub fn hdel(&self, key: &str, field: &str) -> Result { + let key = cstr(key); + let field = cstr(field); + let status = + unsafe { swss_try!(p_status => SWSSDBConnector_hdel(self.ptr, key.as_ptr(), field.as_ptr(), p_status))? }; + Ok(status == 1) + } + + pub fn hset(&self, key: &str, field: &str, val: &CxxStr) -> Result<()> { + let key = cstr(key); + let field = cstr(field); + unsafe { + swss_try!(SWSSDBConnector_hset( + self.ptr, + key.as_ptr(), + field.as_ptr(), + val.as_raw(), + )) + } + } + + pub fn hget(&self, key: &str, field: &str) -> Result> { + let key = cstr(key); + let field = cstr(field); + unsafe { + let ans = swss_try!(p_ans => SWSSDBConnector_hget(self.ptr, key.as_ptr(), field.as_ptr(), p_ans))?; + Ok(CxxString::take(ans)) + } + } + + pub fn hgetall(&self, key: &str) -> Result> { + let key = cstr(key); + unsafe { + let arr = swss_try!(p_arr => SWSSDBConnector_hgetall(self.ptr, key.as_ptr(), p_arr))?; + Ok(take_field_value_array(arr)) + } + } + + pub fn hexists(&self, key: &str, field: &str) -> Result { + let key = cstr(key); + let field = cstr(field); + let status = unsafe { + swss_try!(p_status => SWSSDBConnector_hexists(self.ptr, key.as_ptr(), field.as_ptr(), p_status))? + }; + Ok(status == 1) + } + + pub fn flush_db(&self) -> Result { + let status = unsafe { swss_try!(p_status => SWSSDBConnector_flushdb(self.ptr, p_status))? }; + Ok(status == 1) + } +} + +impl Clone for DbConnector { + /// Clone with a default timeout of 15 seconds. + /// + /// 15 seconds was picked as an absurdly long time to wait for Redis to respond. + /// Panics after a timeout, or if any other exception occurred. + /// Use `clone_timeout` for control of timeout and exception handling. + fn clone(&self) -> Self { + self.clone_timeout(15000).expect("DbConnector::clone failed") + } +} + +impl Drop for DbConnector { + fn drop(&mut self) { + unsafe { swss_try!(SWSSDBConnector_free(self.ptr)).expect("Dropping DbConnector") }; + } +} + +unsafe impl Send for DbConnector {} + +#[cfg(feature = "async")] +impl DbConnector { + async_util::impl_basic_async_method!(new_async <= new(connection: DbConnectionInfo, timeout_ms: u32) -> Result); + async_util::impl_basic_async_method!(new_named_async <= new_named(db_name: &str, is_tcp_conn: bool, timeout_ms: u32) -> Result); + async_util::impl_basic_async_method!(new_tcp_async <= new_tcp(db_id: i32, hostname: &str, port: u16, timeout_ms: u32) -> Result); + async_util::impl_basic_async_method!(new_unix_async <= new_unix(db_id: i32, sock_path: &str, timeout_ms: u32) -> Result); + async_util::impl_basic_async_method!(new_keyed_async <= new_keyed(db_name: &str, is_tcp_conn: bool, timeout_ms: u32, container_name: &str, netns: &str) -> Result); + async_util::impl_basic_async_method!(clone_timeout_async <= clone_timeout(&self, timeout_ms: u32) -> Result); + async_util::impl_basic_async_method!(del_async <= del(&self, key: &str) -> Result); + async_util::impl_basic_async_method!(set_async <= set(&self, key: &str, value: &CxxStr) -> Result<()>); + async_util::impl_basic_async_method!(get_async <= get(&self, key: &str) -> Result>); + async_util::impl_basic_async_method!(exists_async <= exists(&self, key: &str) -> Result); + async_util::impl_basic_async_method!(hdel_async <= hdel(&self, key: &str, field: &str) -> Result); + async_util::impl_basic_async_method!(hset_async <= hset(&self, key: &str, field: &str, value: &CxxStr) -> Result<()>); + async_util::impl_basic_async_method!(hget_async <= hget(&self, key: &str, field: &str) -> Result>); + async_util::impl_basic_async_method!(hgetall_async <= hgetall(&self, key: &str) -> Result>); + async_util::impl_basic_async_method!(hexists_async <= hexists(&self, key: &str, field: &str) -> Result); + async_util::impl_basic_async_method!(flush_db_async <= flush_db(&self) -> Result); + async_util::impl_basic_async_method!(clone_async <= clone(&self) -> Self); +} diff --git a/crates/swss-common/src/types/exception.rs b/crates/swss-common/src/types/exception.rs new file mode 100644 index 0000000..bc7a611 --- /dev/null +++ b/crates/swss-common/src/types/exception.rs @@ -0,0 +1,79 @@ +use super::*; +use crate::bindings::*; + +/// Rust version of an `SWSSResult`. +/// +/// When an `SWSSResult` is success/`SWSSException_None`, the rust function will return `Ok(..)`. +/// Otherwise, the rust function will return `Err(Exception)`. +pub type Result = std::result::Result; + +/// "Try" an `SWSSResult`, or a call that produces an `SWSSResult`, and convert it into a rust `Result`. +macro_rules! swss_try { + // Convert an SWSSResult to a Result<(), Exception> + ($result:expr) => {{ + let res = $result; + if res.exception == $crate::bindings::SWSSException_SWSSException_None { + Ok(()) + } else { + Err($crate::types::Exception::take(res)) + } + }}; + + // Call a function that takes an output pointer, then convert its result to a Result<(), Exception> + ($out_ptr:ident => $result:expr) => {{ + let mut out = ::std::mem::MaybeUninit::uninit(); + let $out_ptr = out.as_mut_ptr(); + let res = $result; + if res.exception == $crate::bindings::SWSSException_SWSSException_None { + Ok(out.assume_init()) + } else { + Err($crate::types::Exception::take(res)) + } + }}; +} +pub(crate) use swss_try; + +/// Rust version of the body of a failed `SWSSResult`. +#[derive(Debug, Clone)] +pub struct Exception { + message: String, + location: String, +} + +impl Exception { + /// Construct an `Exception` from the details of a failed `SWSSResult`. + pub(crate) unsafe fn take(res: SWSSResult) -> Self { + assert_ne!( + res.exception, SWSSException_SWSSException_None, + "Exception::take() on non-exception SWSSResult: {res:#?}" + ); + Self { + message: CxxString::take(res.message) + .expect("SWSSResult missing message") + .to_string_lossy() + .into_owned(), + location: CxxString::take(res.location) + .expect("SWSSResult missing location") + .to_string_lossy() + .into_owned(), + } + } + + /// Get an informational string about the error that occurred. + pub fn message(&self) -> &str { + &self.message + } + + /// Get an informational string about the where in the code the error occurred. + pub fn location(&self) -> &str { + &self.location + } +} + +impl Display for Exception { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[{}] {}", self.location, self.message) + } +} + +impl Error for Exception {} diff --git a/crates/swss-common/src/types/logger.rs b/crates/swss-common/src/types/logger.rs new file mode 100644 index 0000000..6555ec8 --- /dev/null +++ b/crates/swss-common/src/types/logger.rs @@ -0,0 +1,92 @@ +use super::*; +use getset::{Getters, Setters}; +use lazy_static::lazy_static; +use std::ffi::c_char; +use std::sync::Mutex; + +lazy_static! { + static ref LOGGER: Mutex = Mutex::new(Logger::default()); +} + +pub trait LoggerConfigChangeHandler: Send { + fn on_log_level_change(&mut self, level: &str); + fn on_log_output_change(&mut self, output: &str); +} + +#[derive(Getters, Setters)] +pub struct Logger { + #[getset(get = "pub", set)] + level: String, + #[getset(get = "pub", set)] + output: String, + #[getset(skip)] + handler: Option>, +} + +impl Default for Logger { + fn default() -> Self { + Self { + level: "INFO".to_string(), + output: "STDOUT".to_string(), + handler: None, + } + } +} + +pub fn link_to_swsscommon_logger(db_name: &str, logger_change_handler: T) -> Result<()> +where + T: LoggerConfigChangeHandler + 'static, +{ + { + let mut logger = LOGGER.lock().unwrap(); + logger.handler = Some(Box::new(logger_change_handler)); + } + + let db_name = cstr(db_name); + let def_level = cstr("INFO"); + let def_output = cstr("SYSLOG"); + // link to config db using the specified db_name. It will trigger immediate + // callback to set the current log level and output + unsafe { + swss_try!(SWSSLogger_linkToDbWithOutput( + db_name.as_ptr(), + Some(priority_change_notify), + def_level.as_ptr(), + Some(output_change_notify), + def_output.as_ptr(), + ))?; + // Start logger thread to handle log update from config db + swss_try!(SWSSLogger_restartLogger())?; + Ok(()) + } +} + +extern "C" fn priority_change_notify(_component: *const c_char, priority: *const c_char) { + let priority = unsafe { CStr::from_ptr(priority) }; + let priority = priority.to_str().unwrap(); + + let mut logger = LOGGER.lock().unwrap(); + logger.handler.as_mut().unwrap().as_mut().on_log_level_change(priority); + + logger.set_level(priority.to_string()); + + println!("Priority set to: {priority}"); +} + +extern "C" fn output_change_notify(_component: *const c_char, output: *const c_char) { + let output = unsafe { CStr::from_ptr(output) }; + let output = output.to_str().unwrap(); + + let mut logger = LOGGER.lock().unwrap(); + logger.handler.as_mut().unwrap().as_mut().on_log_output_change(output); + logger.set_output(output.to_string()); + println!("Output set to: {output}"); +} + +pub fn log_level() -> String { + LOGGER.lock().unwrap().level().to_string() +} + +pub fn log_output() -> String { + LOGGER.lock().unwrap().output().to_string() +} diff --git a/crates/swss-common/src/types/producerstatetable.rs b/crates/swss-common/src/types/producerstatetable.rs new file mode 100644 index 0000000..6bfbbe6 --- /dev/null +++ b/crates/swss-common/src/types/producerstatetable.rs @@ -0,0 +1,78 @@ +use super::*; +use crate::bindings::*; + +/// Rust wrapper around `swss::ProducerStateTable`. +#[derive(Debug)] +pub struct ProducerStateTable { + ptr: SWSSProducerStateTable, + _db: DbConnector, +} + +impl ProducerStateTable { + pub fn new(db: DbConnector, table_name: &str) -> Result { + let table_name = cstr(table_name); + let ptr = unsafe { swss_try!(p_pst => SWSSProducerStateTable_new(db.ptr, table_name.as_ptr(), p_pst))? }; + Ok(Self { ptr, _db: db }) + } + + pub fn set_buffered(&self, buffered: bool) -> Result<()> { + unsafe { swss_try!(SWSSProducerStateTable_setBuffered(self.ptr, buffered as u8)) } + } + + pub fn set(&self, key: &str, fvs: I) -> Result<()> + where + I: IntoIterator, + F: AsRef<[u8]>, + V: Into, + { + let key = cstr(key); + let (arr, _k) = make_field_value_array(fvs); + unsafe { swss_try!(SWSSProducerStateTable_set(self.ptr, key.as_ptr(), arr)) } + } + + pub fn del(&self, key: &str) -> Result<()> { + let key = cstr(key); + unsafe { swss_try!(SWSSProducerStateTable_del(self.ptr, key.as_ptr())) } + } + + pub fn flush(&self) -> Result<()> { + unsafe { swss_try!(SWSSProducerStateTable_flush(self.ptr)) } + } + + pub fn count(&self) -> Result { + unsafe { swss_try!(p_count => SWSSProducerStateTable_count(self.ptr, p_count)) } + } + + pub fn clear(&self) -> Result<()> { + unsafe { swss_try!(SWSSProducerStateTable_clear(self.ptr)) } + } + + pub fn create_temp_view(&self) -> Result<()> { + unsafe { swss_try!(SWSSProducerStateTable_create_temp_view(self.ptr)) } + } + + pub fn apply_temp_view(&self) -> Result<()> { + unsafe { swss_try!(SWSSProducerStateTable_apply_temp_view(self.ptr)) } + } +} + +impl Drop for ProducerStateTable { + fn drop(&mut self) { + unsafe { swss_try!(SWSSProducerStateTable_free(self.ptr)).expect("Dropping ProducerStateTable") }; + } +} + +unsafe impl Send for ProducerStateTable {} + +#[cfg(feature = "async")] +impl ProducerStateTable { + async_util::impl_basic_async_method!( + set_async <= set(&self, key: &str, fvs: I) -> Result<()> + where + I: IntoIterator + Send, + F: AsRef<[u8]>, + V: Into, + ); + async_util::impl_basic_async_method!(del_async <= del(&self, key: &str) -> Result<()>); + async_util::impl_basic_async_method!(flush_async <= flush(&self) -> Result<()>); +} diff --git a/crates/swss-common/src/types/subscriberstatetable.rs b/crates/swss-common/src/types/subscriberstatetable.rs new file mode 100644 index 0000000..c72eae3 --- /dev/null +++ b/crates/swss-common/src/types/subscriberstatetable.rs @@ -0,0 +1,85 @@ +use super::*; +use crate::bindings::*; +use std::{os::fd::BorrowedFd, ptr::null, time::Duration}; + +/// Rust wrapper around `swss::SubscriberStateTable`. +#[derive(Debug)] +pub struct SubscriberStateTable { + ptr: SWSSSubscriberStateTable, + db: DbConnector, + table_name: String, +} + +impl SubscriberStateTable { + pub fn new(db: DbConnector, table_name: &str, pop_batch_size: Option, pri: Option) -> Result { + let table_name_string = String::from(table_name); + let table_name = cstr(table_name); + let pop_batch_size = pop_batch_size.as_ref().map(|n| n as *const i32).unwrap_or(null()); + let pri = pri.as_ref().map(|n| n as *const i32).unwrap_or(null()); + let ptr = unsafe { + swss_try!(p_sst => { + SWSSSubscriberStateTable_new(db.ptr, table_name.as_ptr(), pop_batch_size, pri, p_sst) + })? + }; + Ok(Self { + ptr, + db, + table_name: table_name_string, + }) + } + + pub fn pops(&self) -> Result> { + unsafe { + let arr = swss_try!(p_arr => SWSSSubscriberStateTable_pops(self.ptr, p_arr))?; + Ok(take_key_op_field_values_array(arr)) + } + } + + pub fn read_data(&self, timeout: Duration, interrupt_on_signal: bool) -> Result { + let timeout_ms = timeout.as_millis().try_into().unwrap(); + let res = unsafe { + swss_try!(p_res => { + SWSSSubscriberStateTable_readData(self.ptr, timeout_ms, interrupt_on_signal as u8, p_res) + })? + }; + Ok(SelectResult::from_raw(res)) + } + + pub fn get_fd(&self) -> Result { + // SAFETY: This fd represents the underlying redis connection, which should stay alive + // as long as the DbConnector does. + unsafe { + let fd = swss_try!(p_fd => SWSSSubscriberStateTable_getFd(self.ptr, p_fd))?; + let fd = BorrowedFd::borrow_raw(fd.try_into().unwrap()); + Ok(fd) + } + } + + pub fn db_connector(&self) -> &DbConnector { + &self.db + } + + pub fn db_connector_mut(&mut self) -> &mut DbConnector { + &mut self.db + } + + pub fn table_name(&self) -> &str { + &self.table_name + } +} + +impl Drop for SubscriberStateTable { + fn drop(&mut self) { + unsafe { swss_try!(SWSSSubscriberStateTable_free(self.ptr)).expect("Dropping SubscriberStateTable") }; + } +} + +unsafe impl Send for SubscriberStateTable {} + +/// Async versions of methods +#[cfg(feature = "async")] +impl SubscriberStateTable { + async_util::impl_read_data_async!(); + async_util::impl_basic_async_method!(new_async <= new(db: DbConnector, table_name: &str, pop_batch_size: Option, pri: Option) -> Result); + async_util::impl_basic_async_method!(pops_async <= pops(&self) -> Result>); +} diff --git a/crates/swss-common/src/types/table.rs b/crates/swss-common/src/types/table.rs new file mode 100644 index 0000000..6aec3af --- /dev/null +++ b/crates/swss-common/src/types/table.rs @@ -0,0 +1,119 @@ +use super::*; +use crate::bindings::*; +use std::ptr; + +/// Rust wrapper around `swss::Table`. +#[derive(Debug)] +pub struct Table { + name: String, + ptr: SWSSTable, + _db: DbConnector, +} + +impl Table { + pub fn new(db: DbConnector, table_name: &str) -> Result { + let table_name_copy = table_name.to_string(); + let table_name = cstr(table_name); + let ptr = unsafe { swss_try!(p_tbl => SWSSTable_new(db.ptr, table_name.as_ptr(), p_tbl))? }; + Ok(Self { + name: table_name_copy, + ptr, + _db: db, + }) + } + + pub fn get(&self, key: &str) -> Result> { + let key = cstr(key); + let mut arr = SWSSFieldValueArray { + len: 0, + data: ptr::null_mut(), + }; + let exists = unsafe { swss_try!(p_exists => SWSSTable_get(self.ptr, key.as_ptr(), &mut arr, p_exists))? }; + let maybe_fvs = if exists == 1 { + Some(unsafe { take_field_value_array(arr) }) + } else { + None + }; + Ok(maybe_fvs) + } + + pub fn hget(&self, key: &str, field: &str) -> Result> { + let key = cstr(key); + let field = cstr(field); + let mut val: SWSSString = ptr::null_mut(); + let exists = unsafe { + swss_try!(p_exists => SWSSTable_hget(self.ptr, key.as_ptr(), field.as_ptr(), &mut val, p_exists))? + }; + let maybe_fvs = if exists == 1 { + Some(unsafe { CxxString::take(val).unwrap() }) + } else { + None + }; + Ok(maybe_fvs) + } + + pub fn set(&self, key: &str, fvs: I) -> Result<()> + where + I: IntoIterator, + F: AsRef<[u8]>, + V: Into, + { + let key = cstr(key); + let (arr, _k) = make_field_value_array(fvs); + unsafe { swss_try!(SWSSTable_set(self.ptr, key.as_ptr(), arr)) } + } + + pub fn hset(&self, key: &str, field: &str, val: &CxxStr) -> Result<()> { + let key = cstr(key); + let field = cstr(field); + unsafe { swss_try!(SWSSTable_hset(self.ptr, key.as_ptr(), field.as_ptr(), val.as_raw())) } + } + + pub fn del(&self, key: &str) -> Result<()> { + let key = cstr(key); + unsafe { swss_try!(SWSSTable_del(self.ptr, key.as_ptr())) } + } + + pub fn hdel(&self, key: &str, field: &str) -> Result<()> { + let key = cstr(key); + let field = cstr(field); + unsafe { swss_try!(SWSSTable_hdel(self.ptr, key.as_ptr(), field.as_ptr())) } + } + + pub fn get_keys(&self) -> Result> { + unsafe { + let arr = swss_try!(p_arr => SWSSTable_getKeys(self.ptr, p_arr))?; + Ok(take_string_array(arr)) + } + } + + pub fn get_name(&self) -> &str { + &self.name + } +} + +impl Drop for Table { + fn drop(&mut self) { + unsafe { swss_try!(SWSSTable_free(self.ptr)).expect("Dropping Table") }; + } +} + +unsafe impl Send for Table {} + +#[cfg(feature = "async")] +impl Table { + async_util::impl_basic_async_method!(new_async <= new(db: DbConnector, table_name: &str) -> Result); + async_util::impl_basic_async_method!(get_async <= get(&self, key: &str) -> Result>); + async_util::impl_basic_async_method!(hget_async <= hget(&self, key: &str, field: &str) -> Result>); + async_util::impl_basic_async_method!( + set_async <= set(&self, key: &str, fvs: I) -> Result<()> + where + I: IntoIterator + Send, + F: AsRef<[u8]>, + V: Into, + ); + async_util::impl_basic_async_method!(hset_async <= hset(&self, key: &str, field: &str, value: &CxxStr) -> Result<()>); + async_util::impl_basic_async_method!(del_async <= del(&self, key: &str) -> Result<()>); + async_util::impl_basic_async_method!(hdel_async <= hdel(&self, key: &str, field: &str) -> Result<()>); + async_util::impl_basic_async_method!(get_keys_async <= get_keys(&self) -> Result>); +} diff --git a/crates/swss-common/src/types/zmqclient.rs b/crates/swss-common/src/types/zmqclient.rs new file mode 100644 index 0000000..489c10e --- /dev/null +++ b/crates/swss-common/src/types/zmqclient.rs @@ -0,0 +1,61 @@ +use super::*; +use crate::bindings::*; + +/// Rust wrapper around `swss::ZmqClient`. +#[derive(Debug)] +pub struct ZmqClient { + pub(crate) ptr: SWSSZmqClient, +} + +impl ZmqClient { + pub fn new(endpoint: &str) -> Result { + let endpoint = cstr(endpoint); + let ptr = unsafe { swss_try!(p_zc => SWSSZmqClient_new(endpoint.as_ptr(), p_zc))? }; + Ok(Self { ptr }) + } + + pub fn is_connected(&self) -> Result { + let status = unsafe { swss_try!(p_status => SWSSZmqClient_isConnected(self.ptr, p_status))? }; + Ok(status == 1) + } + + pub fn connect(&self) -> Result<()> { + unsafe { swss_try!(SWSSZmqClient_connect(self.ptr)) } + } + + pub fn send_msg(&self, db_name: &str, table_name: &str, kfvs: I) -> Result<()> + where + I: IntoIterator, + { + let db_name = cstr(db_name); + let table_name = cstr(table_name); + let (kfvs, _k) = make_key_op_field_values_array(kfvs); + unsafe { + swss_try!(SWSSZmqClient_sendMsg( + self.ptr, + db_name.as_ptr(), + table_name.as_ptr(), + kfvs, + )) + } + } +} + +impl Drop for ZmqClient { + fn drop(&mut self) { + unsafe { swss_try!(SWSSZmqClient_free(self.ptr)).expect("Dropping ZmqClient") }; + } +} + +unsafe impl Send for ZmqClient {} + +#[cfg(feature = "async")] +impl ZmqClient { + async_util::impl_basic_async_method!(new_async <= new(endpoint: &str) -> Result); + async_util::impl_basic_async_method!(connect_async <= connect(&self) -> Result<()>); + async_util::impl_basic_async_method!( + send_msg_async <= send_msg(&self, db_name: &str, table_name: &str, kfvs: I) -> Result<()> + where + I: IntoIterator + Send, + ); +} diff --git a/crates/swss-common/src/types/zmqconsumerstatetable.rs b/crates/swss-common/src/types/zmqconsumerstatetable.rs new file mode 100644 index 0000000..0488b76 --- /dev/null +++ b/crates/swss-common/src/types/zmqconsumerstatetable.rs @@ -0,0 +1,92 @@ +use super::*; +use crate::bindings::*; +use std::{os::fd::BorrowedFd, ptr::null, sync::Arc, time::Duration}; + +/// Rust wrapper around `swss::ZmqConsumerStateTable`. +#[derive(Debug)] +pub struct ZmqConsumerStateTable { + ptr: SWSSZmqConsumerStateTable, + _db: DbConnector, + + /// See [`DropGuard`] and [`ZmqServer`]. + _drop_guard: Arc, +} + +impl ZmqConsumerStateTable { + pub fn new( + db: DbConnector, + table_name: &str, + zmqs: &mut ZmqServer, + pop_batch_size: Option, + pri: Option, + ) -> Result { + let table_name = cstr(table_name); + let pop_batch_size = pop_batch_size.as_ref().map(|n| n as *const i32).unwrap_or(null()); + let pri = pri.as_ref().map(|n| n as *const i32).unwrap_or(null()); + let ptr = unsafe { + swss_try!(p_zs => { + SWSSZmqConsumerStateTable_new(db.ptr, table_name.as_ptr(), zmqs.ptr, pop_batch_size, pri, p_zs) + })? + }; + let drop_guard = Arc::new(DropGuard(ptr)); + zmqs.register_consumer_state_table(drop_guard.clone()); + Ok(Self { + ptr, + _db: db, + _drop_guard: drop_guard, + }) + } + + pub fn pops(&self) -> Result> { + unsafe { + let arr = swss_try!(p_arr => SWSSZmqConsumerStateTable_pops(self.ptr, p_arr))?; + Ok(take_key_op_field_values_array(arr)) + } + } + + pub fn get_fd(&self) -> Result { + // SAFETY: This fd represents the underlying zmq socket, which should remain alive as long as there + // is a listener (i.e. a ZmqConsumerStateTable) + unsafe { + let fd = swss_try!(p_fd => SWSSZmqConsumerStateTable_getFd(self.ptr, p_fd))?; + let fd = BorrowedFd::borrow_raw(fd.try_into().unwrap()); + Ok(fd) + } + } + + pub fn read_data(&self, timeout: Duration, interrupt_on_signal: bool) -> Result { + let timeout_ms = timeout.as_millis().try_into().unwrap(); + let res = unsafe { + swss_try!(p_res => + SWSSZmqConsumerStateTable_readData(self.ptr, timeout_ms, interrupt_on_signal as u8, p_res) + )? + }; + Ok(SelectResult::from_raw(res)) + } +} + +unsafe impl Send for ZmqConsumerStateTable {} + +/// A type that will free the underlying `ZmqConsumerStateTable` when it is dropped. +/// This is shared with `ZmqServer` +#[derive(Debug)] +pub(crate) struct DropGuard(SWSSZmqConsumerStateTable); + +impl Drop for DropGuard { + fn drop(&mut self) { + unsafe { swss_try!(SWSSZmqConsumerStateTable_free(self.0)).expect("Dropping ZmqConsumerStateTable") }; + } +} + +// SAFETY: This is safe as long as ZmqConsumerStateTable is Send +unsafe impl Send for DropGuard {} + +// SAFETY: There is no way to use &DropGuard so it is safe +unsafe impl Sync for DropGuard {} + +/// Async versions of methods +#[cfg(feature = "async")] +impl ZmqConsumerStateTable { + async_util::impl_read_data_async!(); + async_util::impl_basic_async_method!(pops_async <= pops(&self) -> Result>); +} diff --git a/crates/swss-common/src/types/zmqproducerstatetable.rs b/crates/swss-common/src/types/zmqproducerstatetable.rs new file mode 100644 index 0000000..9ee69f3 --- /dev/null +++ b/crates/swss-common/src/types/zmqproducerstatetable.rs @@ -0,0 +1,67 @@ +use super::*; +use crate::bindings::*; + +/// Rust wrapper around `swss::ZmqProducerStateTable`. +#[derive(Debug)] +pub struct ZmqProducerStateTable { + ptr: SWSSZmqProducerStateTable, + _db: DbConnector, + _zmqc: ZmqClient, +} + +impl ZmqProducerStateTable { + pub fn new(db: DbConnector, table_name: &str, zmqc: ZmqClient, db_persistence: bool) -> Result { + let table_name = cstr(table_name); + let db_persistence = db_persistence as u8; + let ptr = unsafe { + swss_try!(p_zpst => + SWSSZmqProducerStateTable_new(db.ptr, table_name.as_ptr(), zmqc.ptr, db_persistence, p_zpst) + )? + }; + Ok(Self { + ptr, + _db: db, + _zmqc: zmqc, + }) + } + + pub fn set(&self, key: &str, fvs: I) -> Result<()> + where + I: IntoIterator, + F: AsRef<[u8]>, + V: Into, + { + let key = cstr(key); + let (arr, _k) = make_field_value_array(fvs); + unsafe { swss_try!(SWSSZmqProducerStateTable_set(self.ptr, key.as_ptr(), arr)) } + } + + pub fn del(&self, key: &str) -> Result<()> { + let key = cstr(key); + unsafe { swss_try!(SWSSZmqProducerStateTable_del(self.ptr, key.as_ptr())) } + } + + pub fn db_updater_queue_size(&self) -> Result { + unsafe { swss_try!(p_size => SWSSZmqProducerStateTable_dbUpdaterQueueSize(self.ptr, p_size)) } + } +} + +impl Drop for ZmqProducerStateTable { + fn drop(&mut self) { + unsafe { SWSSZmqProducerStateTable_free(self.ptr) }; + } +} + +unsafe impl Send for ZmqProducerStateTable {} + +#[cfg(feature = "async")] +impl ZmqProducerStateTable { + async_util::impl_basic_async_method!( + set_async <= set(&self, key: &str, fvs: I) -> Result<()> + where + I: IntoIterator + Send, + F: AsRef<[u8]>, + V: Into, + ); + async_util::impl_basic_async_method!(del_async <= del(&self, key: &str) -> Result<()>); +} diff --git a/crates/swss-common/src/types/zmqserver.rs b/crates/swss-common/src/types/zmqserver.rs new file mode 100644 index 0000000..a8c7aeb --- /dev/null +++ b/crates/swss-common/src/types/zmqserver.rs @@ -0,0 +1,39 @@ +use super::*; +use crate::bindings::*; +use std::sync::Arc; + +/// Rust wrapper around `swss::ZmqServer`. +#[derive(Debug)] +pub struct ZmqServer { + pub(crate) ptr: SWSSZmqServer, + + /// The types that register message handlers with a ZmqServer must be kept alive until + /// the server thread dies, otherwise we risk the server thread calling methods on deleted objects. + /// + /// Currently this is just ZmqConsumerStateTable, but in the future there may be other types added + /// and this vec will need to hold an enum of the possible message handlers. + message_handler_guards: Vec>, +} + +impl ZmqServer { + pub fn new(endpoint: &str) -> Result { + let endpoint = cstr(endpoint); + let ptr = unsafe { swss_try!(p_zs => SWSSZmqServer_new(endpoint.as_ptr(), p_zs))? }; + Ok(Self { + ptr, + message_handler_guards: Vec::new(), + }) + } + + pub(crate) fn register_consumer_state_table(&mut self, tbl_dg: Arc) { + self.message_handler_guards.push(tbl_dg); + } +} + +impl Drop for ZmqServer { + fn drop(&mut self) { + unsafe { SWSSZmqServer_free(self.ptr) }; + } +} + +unsafe impl Send for ZmqServer {} diff --git a/crates/swss-common/tests/async.rs b/crates/swss-common/tests/async.rs new file mode 100644 index 0000000..1c23d44 --- /dev/null +++ b/crates/swss-common/tests/async.rs @@ -0,0 +1,220 @@ +#![cfg(feature = "async")] +use paste::paste; +use std::{collections::HashMap, time::Duration}; +use swss_common::*; +use swss_common_testing::*; + +async fn timeout(timeout_ms: u32, fut: F) -> F::Output { + tokio::time::timeout(Duration::from_millis(timeout_ms.into()), fut) + .await + .expect("timed out") +} + +// This macro verifies that async test functions are Send. tokio::test is a bit misleading because +// a tokio runtime's root future can be !Send. This takes a test function and defines two tests from +// it - one that is the root future (Send not required), and one that uses a type assertion to check +// that the tested future is Send. +macro_rules! define_tokio_test_fns { + ($f:ident) => { + paste! { + #[tokio::test] + async fn [< $f _ >]() { + $f().await.unwrap(); + } + + fn [< _assert_ $f _is_send >]() { + fn assert_is_send(_: T) {} + assert_is_send($f()); + } + } + }; +} + +define_tokio_test_fns!(dbconnector_async_api_basic_test); +async fn dbconnector_async_api_basic_test() -> Result<(), Exception> { + let redis = Redis::start(); + let mut db = redis.db_connector(); + + drop(db.clone_timeout_async(10000).await?); + + assert!(db.flush_db_async().await?); + + let random = random_cxx_string(); + + db.set_async("hello", &CxxString::new("hello, world!")).await?; + db.set_async("random", &random).await?; + assert_eq!(db.get_async("hello").await?.unwrap(), "hello, world!"); + assert_eq!(db.get_async("random").await?.unwrap(), random); + assert_eq!(db.get_async("noexist").await?, None); + + assert!(db.exists_async("hello").await?); + assert!(!db.exists_async("noexist").await?); + assert!(db.del_async("hello").await?); + assert!(!db.del_async("hello").await?); + assert!(db.del_async("random").await?); + assert!(!db.del_async("random").await?); + assert!(!db.del_async("noexist").await?); + + db.hset_async("a", "hello", &CxxString::new("hello, world!")).await?; + db.hset_async("a", "random", &random).await?; + assert_eq!(db.hget_async("a", "hello").await?.unwrap(), "hello, world!"); + assert_eq!(db.hget_async("a", "random").await?.unwrap(), random); + assert_eq!(db.hget_async("a", "noexist").await?, None); + assert_eq!(db.hget_async("noexist", "noexist").await?, None); + assert!(db.hexists_async("a", "hello").await?); + assert!(!db.hexists_async("a", "noexist").await?); + assert!(!db.hexists_async("noexist", "hello").await?); + assert!(db.hdel_async("a", "hello").await?); + assert!(!db.hdel_async("a", "hello").await?); + assert!(db.hdel_async("a", "random").await?); + assert!(!db.hdel_async("a", "random").await?); + assert!(!db.hdel_async("a", "noexist").await?); + assert!(!db.hdel_async("noexist", "noexist").await?); + assert!(!db.del_async("a").await?); + + assert!(db.hgetall_async("a").await?.is_empty()); + db.hset_async("a", "a", &CxxString::new("1")).await?; + db.hset_async("a", "b", &CxxString::new("2")).await?; + db.hset_async("a", "c", &CxxString::new("3")).await?; + assert_eq!( + db.hgetall_async("a").await?, + HashMap::from_iter([ + ("a".into(), "1".into()), + ("b".into(), "2".into()), + ("c".into(), "3".into()) + ]) + ); + + assert!(db.flush_db_async().await?); + + Ok(()) +} + +define_tokio_test_fns!(consumer_producer_state_tables_async_api_basic_test); +async fn consumer_producer_state_tables_async_api_basic_test() -> Result<(), Exception> { + let redis = Redis::start(); + let mut pst = ProducerStateTable::new(redis.db_connector(), "table_a")?; + let mut cst = ConsumerStateTable::new(redis.db_connector(), "table_a", None, None)?; + + assert!(cst.pops()?.is_empty()); + + let mut kfvs = random_kfvs(); + for (i, kfv) in kfvs.iter().enumerate() { + assert_eq!(pst.count()?, i as i64); + match kfv.operation { + KeyOperation::Set => pst.set_async(&kfv.key, kfv.field_values.clone()).await?, + KeyOperation::Del => pst.del_async(&kfv.key).await?, + } + } + + timeout(2000, cst.read_data_async()).await.unwrap(); + let mut kfvs_cst = cst.pops()?; + assert!(cst.pops()?.is_empty()); + + kfvs.sort_unstable(); + kfvs_cst.sort_unstable(); + assert_eq!(kfvs_cst.len(), kfvs.len()); + assert_eq!(kfvs_cst, kfvs); + + Ok(()) +} + +define_tokio_test_fns!(subscriber_state_table_async_api_basic_test); +async fn subscriber_state_table_async_api_basic_test() -> Result<(), Exception> { + let redis = Redis::start(); + let mut db = redis.db_connector(); + let mut sst = SubscriberStateTable::new(redis.db_connector(), "table_a", None, None)?; + assert!(sst.pops()?.is_empty()); + + db.hset_async("table_a:key_a", "field_a", &CxxString::new("value_a")) + .await?; + db.hset_async("table_a:key_a", "field_b", &CxxString::new("value_b")) + .await?; + timeout(300, sst.read_data_async()).await.unwrap(); + let mut kfvs = sst.pops()?; + + // SubscriberStateTable will pick up duplicate KeyOpFieldValues' after two SETs on the same + // key. I'm not actually sure if this is intended. + assert_eq!(kfvs.len(), 2); + assert_eq!(kfvs[0], kfvs[1]); + + assert!(sst.pops()?.is_empty()); + + let KeyOpFieldValues { + key, + operation, + field_values, + } = kfvs.pop().unwrap(); + + assert_eq!(key, "key_a"); + assert_eq!(operation, KeyOperation::Set); + assert_eq!( + field_values, + HashMap::from_iter([ + ("field_a".into(), "value_a".into()), + ("field_b".into(), "value_b".into()) + ]) + ); + + Ok(()) +} + +define_tokio_test_fns!(zmq_consumer_producer_state_table_async_api_basic_test); +async fn zmq_consumer_producer_state_table_async_api_basic_test() -> Result<(), Exception> { + let (endpoint, _delete) = random_zmq_endpoint(); + let mut zmqs = ZmqServer::new(&endpoint)?; + let zmqc = ZmqClient::new(&endpoint)?; + + let redis = Redis::start(); + let mut zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?; + let mut zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; + + let kfvs = random_kfvs(); + for kfv in &kfvs { + match kfv.operation { + KeyOperation::Set => zpst.set_async(&kfv.key, kfv.field_values.clone()).await?, + KeyOperation::Del => zpst.del_async(&kfv.key).await?, + } + } + + let mut kfvs_seen = Vec::new(); + while kfvs_seen.len() != kfvs.len() { + timeout(2000, zcst.read_data_async()).await.unwrap(); + kfvs_seen.extend(zcst.pops()?); + } + assert_eq!(kfvs, kfvs_seen); + + Ok(()) +} + +define_tokio_test_fns!(table_async_api_basic_test); +async fn table_async_api_basic_test() -> Result<(), Exception> { + let redis = Redis::start(); + let mut table = Table::new_async(redis.db_connector(), "mytable").await?; + assert!(table.get_keys_async().await?.is_empty()); + assert!(table.get_async("mykey").await?.is_none()); + + let fvs = random_fvs(); + table.set_async("mykey", fvs.clone()).await?; + assert_eq!(table.get_keys_async().await?, &["mykey"]); + assert_eq!(table.get_async("mykey").await?.as_ref(), Some(&fvs)); + + let (field, value) = fvs.iter().next().unwrap(); + assert_eq!(table.hget_async("mykey", field).await?.as_ref(), Some(value)); + table.hdel_async("mykey", field).await?; + assert_eq!(table.hget_async("mykey", field).await?, None); + + table + .hset_async("mykey", field, &CxxString::from("my special value")) + .await?; + assert_eq!( + table.hget_async("mykey", field).await?.unwrap().as_bytes(), + b"my special value" + ); + + table.del_async("mykey").await?; + assert!(table.get_keys_async().await?.is_empty()); + assert!(table.get_async("mykey").await?.is_none()); + + Ok(()) +} diff --git a/crates/swss-common/tests/logger.rs b/crates/swss-common/tests/logger.rs new file mode 100644 index 0000000..b27e1b2 --- /dev/null +++ b/crates/swss-common/tests/logger.rs @@ -0,0 +1,49 @@ +use lazy_static::lazy_static; +use std::sync::Mutex; +use std::thread::sleep; +use std::time::Duration; +use swss_common::*; +use swss_common_testing::*; + +lazy_static! { + static ref LEVEL: Mutex = Mutex::new("INFO".to_string()); + static ref OUTPUT: Mutex = Mutex::new("INFO".to_string()); +} +struct LoggerConfigHandlerForTest {} + +impl LoggerConfigChangeHandler for LoggerConfigHandlerForTest { + fn on_log_level_change(&mut self, level_str: &str) { + let mut level = LEVEL.lock().unwrap(); + *level = level_str.to_string(); + } + + fn on_log_output_change(&mut self, output_str: &str) { + let mut output = OUTPUT.lock().unwrap(); + *output = output_str.to_string(); + } +} +#[test] +fn logger_init_test() -> Result<(), Exception> { + let redis = Redis::start_config_db(); + // todo: change redis::db_connector to passing db_id as parameter + let db = DbConnector::new_unix(1, &redis.sock, 0)?; + + db.hset("LOGGER|test", "LOGLEVEL", &CxxString::new("NOTICE"))?; + db.hset("LOGGER|test", "LOGOUTPUT", &CxxString::new("SYSLOG"))?; + + // connect to swsscommon logger + let handler = LoggerConfigHandlerForTest {}; + link_to_swsscommon_logger("test", handler)?; + + assert_eq!("NOTICE", *LEVEL.lock().unwrap()); + assert_eq!("SYSLOG", *OUTPUT.lock().unwrap()); + + // test log level change + db.hset("LOGGER|test", "LOGLEVEL", &CxxString::new("CRIT"))?; + db.hset("LOGGER|test", "LOGOUTPUT", &CxxString::new("STDOUT"))?; + sleep(Duration::from_millis(500)); + + assert_eq!("CRIT", *LEVEL.lock().unwrap()); + assert_eq!("STDOUT", *OUTPUT.lock().unwrap()); + Ok(()) +} diff --git a/crates/swss-common/tests/logger_fallback.rs b/crates/swss-common/tests/logger_fallback.rs new file mode 100644 index 0000000..5e3a761 --- /dev/null +++ b/crates/swss-common/tests/logger_fallback.rs @@ -0,0 +1,22 @@ +use swss_common::*; + +struct LoggerConfigHandlerForTest {} + +impl LoggerConfigChangeHandler for LoggerConfigHandlerForTest { + #[allow(unused_variables)] + fn on_log_level_change(&mut self, level_str: &str) {} + + #[allow(unused_variables)] + fn on_log_output_change(&mut self, output_str: &str) {} +} + +// Test link_to_swsscommon_logger returning an error when Redis is not available. +// This test has to be run in a separate test suite because it can't run with other tests that use Redis. +#[test] +fn logger_init_without_redis() { + let handler = LoggerConfigHandlerForTest {}; + // connect to swsscommon logger + let result = link_to_swsscommon_logger("test", handler); + + assert!(result.is_err()); +} diff --git a/crates/swss-common/tests/sync.rs b/crates/swss-common/tests/sync.rs new file mode 100644 index 0000000..f366c7f --- /dev/null +++ b/crates/swss-common/tests/sync.rs @@ -0,0 +1,223 @@ +use std::{collections::HashMap, time::Duration}; +use swss_common::*; +use swss_common_testing::*; + +#[test] +fn dbconnector_sync_api_basic_test() -> Result<(), Exception> { + let redis = Redis::start(); + let db = redis.db_connector(); + + drop(db.clone_timeout(10000)); + + assert!(db.flush_db()?); + + let random = random_cxx_string(); + + db.set("hello", &CxxString::new("hello, world!"))?; + db.set("random", &random)?; + assert_eq!(db.get("hello")?.unwrap(), "hello, world!"); + assert_eq!(db.get("random")?.unwrap(), random); + assert_eq!(db.get("noexist")?, None); + + assert!(db.exists("hello")?); + assert!(!db.exists("noexist")?); + assert!(db.del("hello")?); + assert!(!db.del("hello")?); + assert!(db.del("random")?); + assert!(!db.del("random")?); + assert!(!db.del("noexist")?); + + db.hset("a", "hello", &CxxString::new("hello, world!"))?; + db.hset("a", "random", &random)?; + assert_eq!(db.hget("a", "hello")?.unwrap(), "hello, world!"); + assert_eq!(db.hget("a", "random")?.unwrap(), random); + assert_eq!(db.hget("a", "noexist")?, None); + assert_eq!(db.hget("noexist", "noexist")?, None); + assert!(db.hexists("a", "hello")?); + assert!(!db.hexists("a", "noexist")?); + assert!(!db.hexists("noexist", "hello")?); + assert!(db.hdel("a", "hello")?); + assert!(!db.hdel("a", "hello")?); + assert!(db.hdel("a", "random")?); + assert!(!db.hdel("a", "random")?); + assert!(!db.hdel("a", "noexist")?); + assert!(!db.hdel("noexist", "noexist")?); + assert!(!db.del("a")?); + + assert!(db.hgetall("a")?.is_empty()); + db.hset("a", "a", &CxxString::new("1"))?; + db.hset("a", "b", &CxxString::new("2"))?; + db.hset("a", "c", &CxxString::new("3"))?; + assert_eq!( + db.hgetall("a")?, + HashMap::from_iter([ + ("a".into(), "1".into()), + ("b".into(), "2".into()), + ("c".into(), "3".into()) + ]) + ); + + assert!(db.flush_db()?); + + Ok(()) +} + +#[test] +fn consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Exception> { + sonic_db_config_init_for_test(); + let redis = Redis::start(); + let pst = ProducerStateTable::new(redis.db_connector(), "table_a")?; + let cst = ConsumerStateTable::new(redis.db_connector(), "table_a", None, None)?; + + assert!(cst.pops()?.is_empty()); + + let mut kfvs = random_kfvs(); + for (i, kfv) in kfvs.iter().enumerate() { + assert_eq!(pst.count()?, i as i64); + match kfv.operation { + KeyOperation::Set => pst.set(&kfv.key, kfv.field_values.clone())?, + KeyOperation::Del => pst.del(&kfv.key)?, + } + } + + assert_eq!(cst.read_data(Duration::from_millis(2000), true)?, SelectResult::Data); + let mut kfvs_cst = cst.pops()?; + assert!(cst.pops()?.is_empty()); + + kfvs.sort_unstable(); + kfvs_cst.sort_unstable(); + assert_eq!(kfvs_cst.len(), kfvs.len()); + assert_eq!(kfvs_cst, kfvs); + + Ok(()) +} + +#[test] +fn subscriber_state_table_sync_api_basic_test() -> Result<(), Exception> { + sonic_db_config_init_for_test(); + let redis = Redis::start(); + let db = redis.db_connector(); + let sst = SubscriberStateTable::new(redis.db_connector(), "table_a", None, None)?; + assert!(sst.pops()?.is_empty()); + + db.hset("table_a:key_a", "field_a", &CxxString::new("value_a"))?; + db.hset("table_a:key_a", "field_b", &CxxString::new("value_b"))?; + assert_eq!(sst.read_data(Duration::from_millis(300), true)?, SelectResult::Data); + let mut kfvs = sst.pops()?; + + // SubscriberStateTable will pick up duplicate KeyOpFieldValues' after two SETs on the same + // key. I'm not actually sure if this is intended. + assert_eq!(kfvs.len(), 2); + assert_eq!(kfvs[0], kfvs[1]); + + assert!(sst.pops()?.is_empty()); + + let KeyOpFieldValues { + key, + operation, + field_values, + } = kfvs.pop().unwrap(); + + assert_eq!(key, "key_a"); + assert_eq!(operation, KeyOperation::Set); + assert_eq!( + field_values, + HashMap::from_iter([ + ("field_a".into(), "value_a".into()), + ("field_b".into(), "value_b".into()) + ]) + ); + + Ok(()) +} + +#[test] +fn zmq_consumer_state_table_sync_api_basic_test() -> Result<(), Exception> { + use SelectResult::*; + + let (endpoint, _delete) = random_zmq_endpoint(); + let mut zmqs = ZmqServer::new(&endpoint)?; + let zmqc = ZmqClient::new(&endpoint)?; + assert!(zmqc.is_connected()?); + + let redis = Redis::start(); + let zcst_table_a = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; + let zcst_table_b = ZmqConsumerStateTable::new(redis.db_connector(), "table_b", &mut zmqs, None, None)?; + + let kfvs = random_kfvs(); + + zmqc.send_msg("", "table_a", kfvs.clone())?; // db name is empty because we are using DbConnector::new_unix + assert_eq!(zcst_table_a.read_data(Duration::from_millis(1500), true)?, Data); + + zmqc.send_msg("", "table_b", kfvs.clone())?; + assert_eq!(zcst_table_b.read_data(Duration::from_millis(1500), true)?, Data); + + let kfvs_a = zcst_table_a.pops()?; + let kvfs_b = zcst_table_b.pops()?; + assert_eq!(kfvs_a, kvfs_b); + assert_eq!(kfvs, kfvs_a); + + Ok(()) +} + +#[test] +fn zmq_consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Exception> { + use SelectResult::*; + + let (endpoint, _delete) = random_zmq_endpoint(); + let mut zmqs = ZmqServer::new(&endpoint)?; + let zmqc = ZmqClient::new(&endpoint)?; + + let redis = Redis::start(); + let zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?; + let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; + + let kfvs = random_kfvs(); + for kfv in &kfvs { + match kfv.operation { + KeyOperation::Set => zpst.set(&kfv.key, kfv.field_values.clone())?, + KeyOperation::Del => zpst.del(&kfv.key)?, + } + } + + let mut kfvs_seen = Vec::new(); + while kfvs_seen.len() != kfvs.len() { + assert_eq!(zcst.read_data(Duration::from_millis(2000), true)?, Data); + kfvs_seen.extend(zcst.pops()?); + } + assert_eq!(kfvs, kfvs_seen); + + Ok(()) +} + +#[test] +fn table_sync_api_basic_test() -> Result<(), Exception> { + let redis = Redis::start(); + let table = Table::new(redis.db_connector(), "mytable")?; + assert!(table.get_keys()?.is_empty()); + assert!(table.get("mykey")?.is_none()); + + let fvs = random_fvs(); + table.set("mykey", fvs.clone())?; + assert_eq!(table.get_keys()?, &["mykey"]); + assert_eq!(table.get("mykey")?.as_ref(), Some(&fvs)); + + let (field, value) = fvs.iter().next().unwrap(); + assert_eq!(table.hget("mykey", field)?.as_ref(), Some(value)); + table.hdel("mykey", field)?; + assert_eq!(table.hget("mykey", field)?, None); + + table.hset("mykey", field, &CxxString::from("my special value"))?; + assert_eq!(table.hget("mykey", field)?.unwrap().as_bytes(), b"my special value"); + + table.del("mykey")?; + assert!(table.get_keys()?.is_empty()); + assert!(table.get("mykey")?.is_none()); + + Ok(()) +} + +#[test] +fn expected_exceptions() { + DbConnector::new_tcp(0, "127.0.0.1", 1, 10000).unwrap_err(); +} diff --git a/debian/rules b/debian/rules index 785904b..928253b 100755 --- a/debian/rules +++ b/debian/rules @@ -44,3 +44,11 @@ endif override_dh_auto_configure: dh_auto_configure -- $(CONFIGURE_ARGS) $(DEB_CONFIGURE_EXTRA_FLAGS) # -DCMAKE_LIBRARY_PATH=$(DEB_HOST_MULTIARCH) + +override_dh_auto_build: + dh_auto_build + SWSS_COMMON_REPO=$(CURDIR) cargo build --release --all + +override_dh_auto_clean: + dh_auto_clean + cargo clean --release