From fb433efd7d4f4bbd60428679c27afb99bb566264 Mon Sep 17 00:00:00 2001 From: Sonic Build Admin Date: Thu, 31 Jul 2025 21:50:38 +0000 Subject: [PATCH] Move swss-common/swss-common-testing to sonic-swss-common repo Only merge this PR after merging https://github.com/sonic-net/sonic-swss-common/pull/1052 --- Cargo.lock | 10 +- Cargo.toml | 4 - crates/container/Cargo.toml | 2 +- crates/hamgrd/Cargo.toml | 4 +- crates/sonic-common/Cargo.toml | 2 +- crates/sonicdb-derive/Cargo.toml | 2 +- crates/swbus-actor/Cargo.toml | 4 +- crates/swbus-cli/Cargo.toml | 4 +- crates/swbus-config/Cargo.toml | 4 +- crates/swss-common-bridge/Cargo.toml | 4 +- crates/swss-common-testing/Cargo.toml | 17 - crates/swss-common-testing/src/lib.rs | 268 -------------- crates/swss-common/Cargo.toml | 31 -- crates/swss-common/build.rs | 54 --- crates/swss-common/src/lib.rs | 27 -- crates/swss-common/src/types.rs | 314 ----------------- crates/swss-common/src/types/async_util.rs | 46 --- .../src/types/consumerstatetable.rs | 82 ----- crates/swss-common/src/types/cxxstring.rs | 328 ------------------ crates/swss-common/src/types/dbconnector.rs | 249 ------------- crates/swss-common/src/types/exception.rs | 79 ----- crates/swss-common/src/types/logger.rs | 92 ----- .../src/types/producerstatetable.rs | 78 ----- .../src/types/subscriberstatetable.rs | 85 ----- crates/swss-common/src/types/table.rs | 119 ------- crates/swss-common/src/types/zmqclient.rs | 61 ---- .../src/types/zmqconsumerstatetable.rs | 92 ----- .../src/types/zmqproducerstatetable.rs | 67 ---- crates/swss-common/src/types/zmqserver.rs | 39 --- crates/swss-common/tests/async.rs | 220 ------------ crates/swss-common/tests/logger.rs | 49 --- crates/swss-common/tests/logger_fallback.rs | 22 -- crates/swss-common/tests/sync.rs | 264 -------------- crates/swss-serde/Cargo.toml | 4 +- 34 files changed, 17 insertions(+), 2710 deletions(-) delete mode 100644 crates/swss-common-testing/Cargo.toml delete mode 100644 crates/swss-common-testing/src/lib.rs delete mode 100644 crates/swss-common/Cargo.toml delete mode 100644 crates/swss-common/build.rs delete mode 100644 crates/swss-common/src/lib.rs delete mode 100644 crates/swss-common/src/types.rs delete mode 100644 crates/swss-common/src/types/async_util.rs delete mode 100644 crates/swss-common/src/types/consumerstatetable.rs delete mode 100644 crates/swss-common/src/types/cxxstring.rs delete mode 100644 crates/swss-common/src/types/dbconnector.rs delete mode 100644 crates/swss-common/src/types/exception.rs delete mode 100644 crates/swss-common/src/types/logger.rs delete mode 100644 crates/swss-common/src/types/producerstatetable.rs delete mode 100644 crates/swss-common/src/types/subscriberstatetable.rs delete mode 100644 crates/swss-common/src/types/table.rs delete mode 100644 crates/swss-common/src/types/zmqclient.rs delete mode 100644 crates/swss-common/src/types/zmqconsumerstatetable.rs delete mode 100644 crates/swss-common/src/types/zmqproducerstatetable.rs delete mode 100644 crates/swss-common/src/types/zmqserver.rs delete mode 100644 crates/swss-common/tests/async.rs delete mode 100644 crates/swss-common/tests/logger.rs delete mode 100644 crates/swss-common/tests/logger_fallback.rs delete mode 100644 crates/swss-common/tests/sync.rs diff --git a/Cargo.lock b/Cargo.lock index 4b6ffc5..6327102 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1425,12 +1425,6 @@ dependencies = [ "windows-targets 0.52.4", ] -[[package]] -name = "paste" -version = "1.0.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" - [[package]] name = "percent-encoding" version = "2.3.1" @@ -2129,14 +2123,13 @@ dependencies = [ [[package]] name = "swss-common" version = "0.1.0" +source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#93af9275a3c413b45a47e4fd7967f15e2ea13297" dependencies = [ "bindgen", "getset", "lazy_static", "libc", - "paste", "serde", - "swss-common-testing", "tokio", "tracing-subscriber", ] @@ -2156,6 +2149,7 @@ dependencies = [ [[package]] name = "swss-common-testing" version = "0.1.0" +source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#93af9275a3c413b45a47e4fd7967f15e2ea13297" dependencies = [ "lazy_static", "rand", diff --git a/Cargo.toml b/Cargo.toml index 96d62d4..dea146f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,6 @@ members = [ "crates/swbus-core", "crates/swbusd", "crates/hamgrd", - "crates/swss-common", - "crates/swss-common-testing", "crates/swss-serde", "crates/swbus-edge", "crates/swbus-proto", @@ -92,8 +90,6 @@ swbus-core = { version = "0.1.0", path = "crates/swbus-core" } swbus-edge = { version = "0.1.0", path = "crates/swbus-edge" } swbus-config = { version = "0.1.0", path = "crates/swbus-config" } swss-serde = { version = "0.1.0", path = "crates/swss-serde" } -swss-common = { version = "0.1.0", path = "crates/swss-common" } -swss-common-testing = { version = "0.1.0", path = "crates/swss-common-testing" } swbus-actor = { version = "0.1.0", path = "crates/swbus-actor" } sonicdb-derive = { version = "0.1.0", path = "crates/sonicdb-derive" } diff --git a/crates/container/Cargo.toml b/crates/container/Cargo.toml index 846d86c..e5b4daf 100644 --- a/crates/container/Cargo.toml +++ b/crates/container/Cargo.toml @@ -14,7 +14,7 @@ chrono = { workspace = true } enumset = { workspace = true } futures-util = { workspace = true } clap = { workspace = true } -swss-common = { path = "../swss-common" } +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } tokio = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/crates/hamgrd/Cargo.toml b/crates/hamgrd/Cargo.toml index 2bcc39a..504a271 100644 --- a/crates/hamgrd/Cargo.toml +++ b/crates/hamgrd/Cargo.toml @@ -15,10 +15,10 @@ edition.workspace = true [dependencies] swbus-edge = { path = "../swbus-edge" } swbus-actor = { path = "../swbus-actor" } -swss-common = { path = "../swss-common" } +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } swss-common-bridge = { path = "../swss-common-bridge" } swss-serde = { path = "../swss-serde" } -swss-common-testing.workspace = true +swss-common-testing = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } swbus-config.workspace = true sonic-common.workspace = true sonicdb-derive.workspace = true diff --git a/crates/sonic-common/Cargo.toml b/crates/sonic-common/Cargo.toml index 0da1ff4..45e5b9d 100644 --- a/crates/sonic-common/Cargo.toml +++ b/crates/sonic-common/Cargo.toml @@ -24,7 +24,7 @@ signal-hook.workspace = true color-eyre.workspace = true # internal dependencies -swss-common.workspace = true +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } # Utils lazy_static.workspace = true diff --git a/crates/sonicdb-derive/Cargo.toml b/crates/sonicdb-derive/Cargo.toml index 3e3a0b5..e3d8479 100644 --- a/crates/sonicdb-derive/Cargo.toml +++ b/crates/sonicdb-derive/Cargo.toml @@ -12,7 +12,7 @@ edition.workspace = true syn = "2.0" # For parsing Rust code quote = "1.0" # For generating Rust code proc-macro2 = "1.0" # For working with procedural macros -swss-common.workspace = true +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } [lib] proc-macro = true diff --git a/crates/swbus-actor/Cargo.toml b/crates/swbus-actor/Cargo.toml index 8f844ae..338ed3b 100644 --- a/crates/swbus-actor/Cargo.toml +++ b/crates/swbus-actor/Cargo.toml @@ -10,7 +10,7 @@ edition.workspace = true [dependencies] swbus-edge = { path = "../swbus-edge" } -swss-common = { path = "../swss-common", features = ["async"] } +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master", features = ["async"] } tokio.workspace = true serde.workspace = true serde_json.workspace = true @@ -22,4 +22,4 @@ tracing.workspace = true workspace = true [dev-dependencies] -swss-common-testing = { path = "../swss-common-testing" } +swss-common-testing = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } diff --git a/crates/swbus-cli/Cargo.toml b/crates/swbus-cli/Cargo.toml index c7336cf..60ad4cc 100644 --- a/crates/swbus-cli/Cargo.toml +++ b/crates/swbus-cli/Cargo.toml @@ -35,8 +35,8 @@ swbus-config.workspace = true swbus-actor.workspace = true [dev-dependencies] -swss-common-testing.workspace = true -swss-common.workspace = true +swss-common-testing = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } swss-serde.workspace = true [lints] diff --git a/crates/swbus-config/Cargo.toml b/crates/swbus-config/Cargo.toml index 54e891f..ab385bb 100644 --- a/crates/swbus-config/Cargo.toml +++ b/crates/swbus-config/Cargo.toml @@ -15,11 +15,11 @@ serde_json.workspace = true thiserror.workspace = true tracing.workspace = true swss-serde.workspace = true -swss-common.workspace = true +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } swbus-proto.workspace = true [dev-dependencies] -swss-common-testing.workspace = true +swss-common-testing = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } tempfile.workspace = true [lints] diff --git a/crates/swss-common-bridge/Cargo.toml b/crates/swss-common-bridge/Cargo.toml index 0fd2bff..2153471 100644 --- a/crates/swss-common-bridge/Cargo.toml +++ b/crates/swss-common-bridge/Cargo.toml @@ -9,7 +9,7 @@ keywords.workspace = true edition.workspace = true [dependencies] -swss-common = { path = "../swss-common", features = ["async"] } +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master", features = ["async"] } swbus-edge = { path = "../swbus-edge" } tokio.workspace = true tokio-util.workspace = true @@ -19,4 +19,4 @@ swbus-actor = { path = "../swbus-actor" } workspace = true [dev-dependencies] -swss-common-testing = { path = "../swss-common-testing" } +swss-common-testing = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } diff --git a/crates/swss-common-testing/Cargo.toml b/crates/swss-common-testing/Cargo.toml deleted file mode 100644 index 8525979..0000000 --- a/crates/swss-common-testing/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "swss-common-testing" -version.workspace = true -authors.workspace = true -license.workspace = true -repository.workspace = true -documentation.workspace = true -keywords.workspace = true -edition.workspace = true - -[dependencies] -swss-common = { path = "../swss-common" } -rand = "0.8.5" -lazy_static.workspace = true - -[lints] -workspace = true diff --git a/crates/swss-common-testing/src/lib.rs b/crates/swss-common-testing/src/lib.rs deleted file mode 100644 index 7b9c587..0000000 --- a/crates/swss-common-testing/src/lib.rs +++ /dev/null @@ -1,268 +0,0 @@ -use std::{ - collections::HashMap, - fs::{self, remove_file}, - io::{BufRead, BufReader}, - iter, - process::{Child, Command, Stdio}, - sync::Mutex, -}; - -use lazy_static::lazy_static; -use std::sync::Arc; - -use rand::{random, Rng}; - -use swss_common::*; - -lazy_static! { - static ref CONFIG_DB: Mutex>> = Mutex::new(None); -} - -pub struct Redis { - pub proc: Child, - pub sock: String, -} - -impl Redis { - /// Start a Redis instance with a random unix socket. Multiple instances can be started. - /// It is mutually exclusive with start_config_db(). - pub fn start() -> Self { - sonic_db_config_init_for_test(); - Redis::start_with_sock(random_unix_sock()) - } - - /// Start a Redis with config_db. Only one instance can be started at a time. - /// It is mutually exclusive with start(). - pub fn start_config_db() -> Arc { - CONFIG_DB - .lock() - .unwrap() - .get_or_insert_with(|| { - let sock_str = random_unix_sock(); - config_db_config_init_for_test(&sock_str); - Arc::new(Redis::start_with_sock(sock_str)) - }) - .clone() - } - - fn start_with_sock(sock: String) -> Self { - #[rustfmt::skip] - #[allow(clippy::zombie_processes)] - let mut child = Command::new("timeout") - .args([ - "--signal=KILL", - "15s", - "redis-server", - "--appendonly", "no", - "--save", "", - "--notify-keyspace-events", "AKE", - "--port", "0", - "--unixsocket", &sock, - ]) - .stdout(Stdio::piped()) - .spawn() - .unwrap(); - let mut stdout = BufReader::new(child.stdout.take().unwrap()); - let mut buf = String::new(); - loop { - buf.clear(); - if stdout.read_line(&mut buf).unwrap() == 0 { - panic!("Redis didn't start"); - } - // Some redis version capitalize "Ready", others have lowercase "ready" :P - if buf.contains("eady to accept connections") { - break Self { proc: child, sock }; - } - } - } - - pub fn db_connector(&self) -> DbConnector { - DbConnector::new_unix(0, &self.sock, 0).unwrap() - } -} - -impl Drop for Redis { - fn drop(&mut self) { - Command::new("kill") - .args(["-s", "TERM", &self.proc.id().to_string()]) - .status() - .unwrap(); - self.proc.wait().unwrap(); - } -} - -pub struct Defer(Option); - -impl Drop for Defer { - fn drop(&mut self) { - self.0.take().unwrap()() - } -} - -const DB_CONFIG_JSON: &str = r#" - { - "DATABASES": { - "db name doesn't matter": { - "id": 0, - "separator": ":", - "instance": "redis" - } - } - } - "#; - -const CONFIG_DB_REDIS_CONFIG_JSON: &str = r#" - { - "INSTANCES": { - "redis": { - "hostname": "127.0.0.1", - "port": {port}, - "unix_socket_path": "{path}", - "persistence_for_warm_boot": "yes" - } - }, - "DATABASES": { - "APPL_DB": { - "id": 0, - "separator": ":", - "instance": "redis" - }, - "CONFIG_DB": { - "id": 1, - "separator": "|", - "instance": "redis" - }, - "STATE_DB": { - "id": 2, - "separator": "|", - "instance": "redis" - }, - "DPU_STATE_DB": { - "id": 3, - "separator": "|", - "instance": "redis" - }, - "DPU_APPL_DB": { - "id": 4, - "separator": ":", - "instance": "redis" - } - } - } -"#; - -const DB_GLOBAL_CONFIG_JSON: &str = r#" - { - "INCLUDES" : [ - { - "include" : "db_config_test.json" - }, - { - "container_name" : "dpu0", - "include" : "db_config_test.json" - - } - ] - } -"#; - -static SONIC_DB_INITIALIZED: Mutex = Mutex::new(false); -static CONIFG_DB_INITIALIZED: Mutex = Mutex::new(false); - -pub fn sonic_db_config_init_for_test() { - // HACK - // We need to do our own locking here because locking is not correctly implemented in - // swss::SonicDBConfig :/ - let config_db_init = CONIFG_DB_INITIALIZED.lock().unwrap(); - // config_db and sonic_db are mutually exclusive because the lock in swss::SonicDBConfig - assert!(!*config_db_init); - - let mut sonic_db_init = SONIC_DB_INITIALIZED.lock().unwrap(); - if !*sonic_db_init { - fs::write("/tmp/db_config_test.json", DB_CONFIG_JSON).unwrap(); - fs::write("/tmp/db_global_config_test.json", DB_GLOBAL_CONFIG_JSON).unwrap(); - sonic_db_config_initialize("/tmp/db_config_test.json").unwrap(); - sonic_db_config_initialize_global("/tmp/db_global_config_test.json").unwrap(); - fs::remove_file("/tmp/db_config_test.json").unwrap(); - fs::remove_file("/tmp/db_global_config_test.json").unwrap(); - *sonic_db_init = true; - } -} - -fn config_db_config_init_for_test(sock_str: &str) { - // HACK - // We need to do our own locking here because locking is not correctly implemented in - // swss::SonicDBConfig :/ - let sonic_db_init = SONIC_DB_INITIALIZED.lock().unwrap(); - // config_db and sonic_db are mutually exclusive because the lock in swss::SonicDBConfig - assert!(!*sonic_db_init); - - let mut config_db_init = CONIFG_DB_INITIALIZED.lock().unwrap(); - if !*config_db_init { - let port = random_port(); - let db_config_json = CONFIG_DB_REDIS_CONFIG_JSON - .replace("{port}", &port.to_string()) - .replace("{path}", sock_str); - fs::write("/tmp/db_config_test.json", db_config_json).unwrap(); - fs::write("/tmp/db_global_config_test.json", DB_GLOBAL_CONFIG_JSON).unwrap(); - sonic_db_config_initialize("/tmp/db_config_test.json").unwrap(); - sonic_db_config_initialize_global("/tmp/db_global_config_test.json").unwrap(); - fs::remove_file("/tmp/db_config_test.json").unwrap(); - fs::remove_file("/tmp/db_global_config_test.json").unwrap(); - *config_db_init = true; - } -} - -pub fn random_string() -> String { - format!("{:0X}", random::()) -} - -pub fn random_cxx_string() -> CxxString { - CxxString::new(random_string()) -} - -pub fn random_fvs() -> FieldValues { - let mut field_values = HashMap::new(); - for _ in 0..rand::thread_rng().gen_range(100..1000) { - field_values.insert(random_string(), random_cxx_string()); - } - field_values -} - -pub fn random_kfv() -> KeyOpFieldValues { - let key = random_string(); - let operation = if random() { KeyOperation::Set } else { KeyOperation::Del }; - let field_values = if operation == KeyOperation::Set { - // We need at least one field-value pair, otherwise swss::BinarySerializer infers that - // the operation is DEL even if the .operation field is SET - random_fvs() - } else { - HashMap::new() - }; - - KeyOpFieldValues { - key, - operation, - field_values, - } -} - -pub fn random_kfvs() -> Vec { - iter::repeat_with(random_kfv).take(100).collect() -} - -pub fn random_unix_sock() -> String { - format!("/tmp/swss-common-testing-{}.sock", random_string()) -} - -pub fn random_port() -> u16 { - let mut rng = rand::thread_rng(); - rng.gen_range(1000..65535) -} - -// zmq doesn't clean up its own ipc sockets, so we include a deferred operation for that -pub fn random_zmq_endpoint() -> (String, impl Drop) { - let sock = random_unix_sock(); - let endpoint = format!("ipc://{sock}"); - (endpoint, Defer(Some(|| remove_file(sock).unwrap()))) -} diff --git a/crates/swss-common/Cargo.toml b/crates/swss-common/Cargo.toml deleted file mode 100644 index bfe17f2..0000000 --- a/crates/swss-common/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "swss-common" -version.workspace = true -authors.workspace = true -license.workspace = true -repository.workspace = true -documentation.workspace = true -keywords.workspace = true -edition.workspace = true - -[lints] -workspace = true - -[features] -async = ["dep:tokio"] - -[dependencies] -libc = "0.2.158" -tokio = { version = "1", optional = true, features = ["net", "rt"] } -serde.workspace = true -getset.workspace = true -lazy_static.workspace = true -tracing-subscriber.workspace = true - -[build-dependencies] -bindgen = "0.70.1" - -[dev-dependencies] -swss-common-testing = { path = "../swss-common-testing" } -paste = "1.0.15" -tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] } diff --git a/crates/swss-common/build.rs b/crates/swss-common/build.rs deleted file mode 100644 index e89a3bb..0000000 --- a/crates/swss-common/build.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::{ - env, fs, - path::{Path, PathBuf}, -}; - -fn main() { - let header_paths = match env::var_os("SWSS_COMMON_REPO") { - // if SWSS_COMMON_REPO is specified, we will build this library based on the files found in the repo - Some(path_string) => { - // Get full path of repo - let path = Path::new(&path_string); - let path = fs::canonicalize(path).expect("canonicalizing SWSS_COMMON_REPO path"); - assert!(path.exists(), "{path:?} doesn't exist"); - - // Add libs path to linker search - let libs_path = path.join("common/.libs").to_str().unwrap().to_string(); - println!("cargo:rustc-link-search=native={libs_path}"); - - // include all of common/c-api/*.h - find_headers_in_dir(path.join("common/c-api")) - } - - // otherwise, we will assume the libswsscommon-dev package is installed and files - // will be found at the locations defined in /debian/libswsscommon-dev.install - None => { - eprintln!("NOTE: If you are compiling without installing libswsscommon-dev, you must set SWSS_COMMON_REPO to the path of a built copy of the sonic-swss-common repo"); - find_headers_in_dir("/usr/include/swss/c-api") - } - }; - - for h in &header_paths { - println!("cargo:rerun-if-changed={h}"); - } - - let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("bindings.rs"); - bindgen::builder() - .headers(&header_paths) - .derive_partialeq(true) - .generate() - .unwrap() - .write_to_file(out_path) - .unwrap(); - - println!("cargo:rustc-link-lib=dylib=swsscommon"); -} - -fn find_headers_in_dir(path: impl AsRef) -> Vec { - fs::read_dir(path) - .unwrap() - .map(|res_entry| res_entry.unwrap().path()) - .filter(|p| p.extension().is_some_and(|ext| ext == "h")) - .map(|p| p.to_str().unwrap().to_string()) - .collect::>() -} diff --git a/crates/swss-common/src/lib.rs b/crates/swss-common/src/lib.rs deleted file mode 100644 index 4082429..0000000 --- a/crates/swss-common/src/lib.rs +++ /dev/null @@ -1,27 +0,0 @@ -mod bindings { - #![allow(unused, non_snake_case, non_upper_case_globals, non_camel_case_types)] - include!(concat!(env!("OUT_DIR"), "/bindings.rs")); -} -mod types; - -pub use types::*; - -/// Rust wrapper around `swss::SonicDBConfig::initialize`. -pub fn sonic_db_config_initialize(path: &str) -> Result<(), Exception> { - let path = cstr(path); - unsafe { swss_try!(bindings::SWSSSonicDBConfig_initialize(path.as_ptr())) } -} - -/// Rust wrapper around `swss::SonicDBConfig::initializeGlobalConfig`. -pub fn sonic_db_config_initialize_global(path: &str) -> Result<(), Exception> { - let path = cstr(path); - unsafe { swss_try!(bindings::SWSSSonicDBConfig_initializeGlobalConfig(path.as_ptr())) } -} - -/// Trait for objects that can be stored in a Sonic DB table. -pub trait SonicDbTable { - fn key_separator() -> char; - fn table_name() -> &'static str; - fn db_name() -> &'static str; - fn is_dpu() -> bool; -} diff --git a/crates/swss-common/src/types.rs b/crates/swss-common/src/types.rs deleted file mode 100644 index 0bc286c..0000000 --- a/crates/swss-common/src/types.rs +++ /dev/null @@ -1,314 +0,0 @@ -#[cfg(feature = "async")] -mod async_util; - -mod consumerstatetable; -mod cxxstring; -mod dbconnector; -mod exception; -mod logger; -mod producerstatetable; -mod subscriberstatetable; -mod table; -mod zmqclient; -mod zmqconsumerstatetable; -mod zmqproducerstatetable; -mod zmqserver; - -pub use consumerstatetable::ConsumerStateTable; -pub use cxxstring::{CxxStr, CxxString}; -pub use dbconnector::{DbConnectionInfo, DbConnector}; -pub use exception::{Exception, Result}; -pub use logger::{link_to_swsscommon_logger, log_level, log_output, LoggerConfigChangeHandler}; -pub use producerstatetable::ProducerStateTable; -pub use subscriberstatetable::SubscriberStateTable; -pub use table::Table; -pub use zmqclient::ZmqClient; -pub use zmqconsumerstatetable::ZmqConsumerStateTable; -pub use zmqproducerstatetable::ZmqProducerStateTable; -pub use zmqserver::ZmqServer; - -pub(crate) use exception::swss_try; - -use crate::bindings::*; -use cxxstring::RawMutableSWSSString; -use serde::{Deserialize, Serialize}; -use std::{ - any::Any, - collections::HashMap, - error::Error, - ffi::{CStr, CString}, - fmt::Display, - slice, - str::FromStr, -}; - -pub(crate) fn cstr(s: impl AsRef<[u8]>) -> CString { - CString::new(s.as_ref()).expect("Bytes being converted to a C string already contains a null byte") -} - -/// Take a malloc'd c string and convert it to a native String -pub(crate) unsafe fn take_cstr(p: *const libc::c_char) -> String { - let s = CStr::from_ptr(p) - .to_str() - .expect("C string being converted to Rust String contains invalid UTF-8") - .to_string(); - libc::free(p as *mut libc::c_void); - s -} - -/// Rust version of the return type from `swss::Select::select`. -/// -/// This enum does not include the `swss::Select::ERROR` because errors are handled via a different -/// mechanism in this library. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum SelectResult { - /// Data is now available. - /// (`swss::Select::OBJECT`) - Data, - /// Waiting was interrupted by a signal. - /// (`swss::Select::SIGNALINT`) - Signal, - /// Timed out. - /// (`swss::Select::TIMEOUT`) - Timeout, -} - -impl SelectResult { - pub(crate) fn from_raw(raw: SWSSSelectResult) -> Self { - if raw == SWSSSelectResult_SWSSSelectResult_DATA { - SelectResult::Data - } else if raw == SWSSSelectResult_SWSSSelectResult_SIGNAL { - SelectResult::Signal - } else if raw == SWSSSelectResult_SWSSSelectResult_TIMEOUT { - SelectResult::Timeout - } else { - unreachable!("Invalid SWSSSelectResult: {raw}"); - } - } -} - -/// Type of the `operation` field in [KeyOpFieldValues]. -/// -/// In swsscommon, this is represented as a string of `"SET"` or `"DEL"`. -/// This type can be constructed similarly - `let op: KeyOperation = "SET".parse().unwrap()`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] -pub enum KeyOperation { - Set, - Del, -} - -impl KeyOperation { - pub(crate) fn as_raw(self) -> SWSSKeyOperation { - match self { - KeyOperation::Set => SWSSKeyOperation_SWSSKeyOperation_SET, - KeyOperation::Del => SWSSKeyOperation_SWSSKeyOperation_DEL, - } - } - - pub(crate) fn from_raw(raw: SWSSKeyOperation) -> Self { - if raw == SWSSKeyOperation_SWSSKeyOperation_SET { - KeyOperation::Set - } else if raw == SWSSKeyOperation_SWSSKeyOperation_DEL { - KeyOperation::Del - } else { - unreachable!("Invalid SWSSKeyOperation: {raw}"); - } - } -} - -impl FromStr for KeyOperation { - type Err = InvalidKeyOperationString; - - /// Create a KeyOperation from `"SET"` or `"DEL"` (case insensitive). - fn from_str(s: &str) -> Result { - let Ok(mut bytes): Result<[u8; 3], _> = s.as_bytes().try_into() else { - return Err(InvalidKeyOperationString(s.to_string())); - }; - bytes.make_ascii_uppercase(); - match &bytes { - b"SET" => Ok(Self::Set), - b"DEL" => Ok(Self::Del), - _ => Err(InvalidKeyOperationString(s.to_string())), - } - } -} - -/// Error type indicating that a `KeyOperation` string was neither `"SET"` nor `"DEL"`. -#[derive(Debug)] -pub struct InvalidKeyOperationString(String); - -impl Display for InvalidKeyOperationString { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, r#"A KeyOperation String must be "SET" or "DEL", but was {}"#, self.0) - } -} - -impl Error for InvalidKeyOperationString {} - -/// Rust version of `vector`. -pub type FieldValues = HashMap; - -/// Rust version of `swss::KeyOpFieldsValuesTuple`. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct KeyOpFieldValues { - pub key: String, - pub operation: KeyOperation, - pub field_values: FieldValues, -} - -impl KeyOpFieldValues { - pub fn set(key: K, fvs: I) -> Self - where - K: Into, - I: IntoIterator, - F: Into, - V: Into, - { - Self { - key: key.into(), - operation: KeyOperation::Set, - field_values: fvs.into_iter().map(|(f, v)| (f.into(), v.into())).collect(), - } - } - - pub fn del(key: K) -> Self - where - K: Into, - { - Self { - key: key.into(), - operation: KeyOperation::Del, - field_values: HashMap::new(), - } - } -} - -/// Intended for testing, ordered by key. -impl PartialOrd for KeyOpFieldValues { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// Intended for testing, ordered by key. -impl Ord for KeyOpFieldValues { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.key.cmp(&other.key) - } -} - -/// Takes ownership of an `SWSSFieldValueArray` and turns it into a native representation. -pub(crate) unsafe fn take_field_value_array(arr: SWSSFieldValueArray) -> FieldValues { - let mut out = HashMap::with_capacity(arr.len as usize); - if !arr.data.is_null() { - let entries = slice::from_raw_parts(arr.data, arr.len as usize); - for fv in entries { - let field = take_cstr(fv.field); - let value = CxxString::take(fv.value).unwrap(); - out.insert(field, value); - } - SWSSFieldValueArray_free(arr); - } - out -} - -/// Takes ownership of an `SWSSKeyOpFieldValuesArray` and turns it into a native representation. -pub(crate) unsafe fn take_key_op_field_values_array(kfvs: SWSSKeyOpFieldValuesArray) -> Vec { - let mut out = Vec::with_capacity(kfvs.len as usize); - if !kfvs.data.is_null() { - unsafe { - let entries = slice::from_raw_parts(kfvs.data, kfvs.len as usize); - for kfv in entries { - let key = take_cstr(kfv.key); - let operation = KeyOperation::from_raw(kfv.operation); - let field_values = take_field_value_array(kfv.fieldValues); - out.push(KeyOpFieldValues { - key, - operation, - field_values, - }); - } - SWSSKeyOpFieldValuesArray_free(kfvs); - }; - } - out -} - -/// Takes ownership of an `SWSSStringArray` and turns it into a native representation. -pub(crate) unsafe fn take_string_array(arr: SWSSStringArray) -> Vec { - let out = if !arr.data.is_null() { - let entries = slice::from_raw_parts(arr.data, arr.len as usize); - Vec::from_iter(entries.iter().map(|&s| take_cstr(s))) - } else { - Vec::new() - }; - SWSSStringArray_free(arr); - out -} - -pub(crate) fn make_field_value_array(fvs: I) -> (SWSSFieldValueArray, KeepAlive) -where - I: IntoIterator, - F: AsRef<[u8]>, - V: Into, -{ - let mut k = KeepAlive::default(); - let mut data = Vec::new(); - - for (field, value) in fvs { - let field = cstr(field); - let value_cxxstring: CxxString = value.into(); - let value_rawswssstring: RawMutableSWSSString = value_cxxstring.into_raw(); - data.push(SWSSFieldValueTuple { - field: field.as_ptr(), - value: value_rawswssstring.as_raw(), - }); - k.keep((field, value_rawswssstring)); - } - - let arr = SWSSFieldValueArray { - data: data.as_mut_ptr(), - len: data.len().try_into().unwrap(), - }; - k.keep(data); - - (arr, k) -} - -pub(crate) fn make_key_op_field_values_array(kfvs: I) -> (SWSSKeyOpFieldValuesArray, KeepAlive) -where - I: IntoIterator, -{ - let mut k = KeepAlive::default(); - let mut data = Vec::new(); - - for kfv in kfvs { - let key = cstr(kfv.key); - let operation = kfv.operation.as_raw(); - let (field_values, arr_k) = make_field_value_array(kfv.field_values); - data.push(SWSSKeyOpFieldValues { - key: key.as_ptr(), - operation, - fieldValues: field_values, - }); - k.keep(Box::new((key, arr_k))) - } - - let arr = SWSSKeyOpFieldValuesArray { - data: data.as_mut_ptr(), - len: data.len().try_into().unwrap(), - }; - k.keep(Box::new(data)); - - (arr, k) -} - -/// Helper struct to keep rust-owned data alive while it is in use by C++ -#[derive(Default)] -pub(crate) struct KeepAlive(Vec>); - -impl KeepAlive { - fn keep(&mut self, t: T) { - self.0.push(Box::new(t)) - } -} diff --git a/crates/swss-common/src/types/async_util.rs b/crates/swss-common/src/types/async_util.rs deleted file mode 100644 index 154e115..0000000 --- a/crates/swss-common/src/types/async_util.rs +++ /dev/null @@ -1,46 +0,0 @@ -/// Take a blocking closure and run it to completion in a [`tokio::task::spawn_blocking`] thread. -pub(crate) async fn spawn_blocking_scoped T + Send, T: Send + 'static>(f: F) -> T { - let clos: Box T + Send> = Box::new(f); - // SAFETY: We are joining the spawned task before the current lifetime ends, so it is sound to - // access any data borrowed from this function in the spawned task. However, I acknowledge that - // extending lifetimes with transmute is terrible. Hail Mary. - let clos_static: Box T + Send + 'static> = unsafe { std::mem::transmute(clos) }; - ::tokio::task::spawn_blocking(clos_static).await.unwrap() -} - -/// These are "basic" because they just use spawn_blocking instead of a more specialized implementation. -/// See [`tokio::task::spawn_blocking`]. -macro_rules! impl_basic_async_method { - // Method (with self) - ($async_fn:ident <= $sync_fn:ident $(<$($generic_decls:tt),*>)? (& $(mut)? self $(, $arg:ident : $typ:ty)*) $(-> $ret_ty:ty)? $(where $($generic_bounds:tt)*)?) => { - #[doc = concat!("Async version of [`", stringify!($sync_fn), "`](Self::", stringify!($sync_fn), ") that uses `tokio::task::spawn_blocking`.")] - pub async fn $async_fn $(<$($generic_decls),*>)? (&mut self $(, $arg: $typ)*) $(-> $ret_ty)? $(where $($generic_bounds)*)? { - $crate::types::async_util::spawn_blocking_scoped(move || self.$sync_fn($($arg),*)).await - } - }; - - // Associated fn (without self) - ($async_fn:ident <= $sync_fn:ident $(<$($generic_decls:tt),*>)? ($($arg:ident : $typ:ty),*) $(-> $ret_ty:ty)? $(where $($generic_bounds:tt)*)?) => { - #[doc = concat!("Async version of [`", stringify!($sync_fn), "`](Self::", stringify!($sync_fn), ") that uses `tokio::task::spawn_blocking`.")] - pub async fn $async_fn $(<$($generic_decls),*>)? ($($arg: $typ),*) $(-> $ret_ty)? $(where $($generic_bounds)*)? { - $crate::types::async_util::spawn_blocking_scoped(move || Self::$sync_fn($($arg),*)).await - } - }; -} -pub(crate) use impl_basic_async_method; - -macro_rules! impl_read_data_async { - () => { - /// Async version of [`read_data`](Self::read_data). Does not time out or interrupt on signal. - pub async fn read_data_async(&mut self) -> ::std::io::Result<()> { - use ::tokio::io::{unix::AsyncFd, Interest}; - - let fd = self.get_fd().map_err(::std::io::Error::other)?; - let _ready_guard = AsyncFd::with_interest(fd, Interest::READABLE)?.readable().await?; - self.read_data(Duration::from_secs(0), false) - .map_err(::std::io::Error::other)?; - Ok(()) - } - }; -} -pub(crate) use impl_read_data_async; diff --git a/crates/swss-common/src/types/consumerstatetable.rs b/crates/swss-common/src/types/consumerstatetable.rs deleted file mode 100644 index 7789b72..0000000 --- a/crates/swss-common/src/types/consumerstatetable.rs +++ /dev/null @@ -1,82 +0,0 @@ -use super::*; -use crate::bindings::*; -use std::{os::fd::BorrowedFd, ptr::null, time::Duration}; - -/// Rust wrapper around `swss::ConsumerStateTable`. -#[derive(Debug)] -pub struct ConsumerStateTable { - ptr: SWSSConsumerStateTable, - db: DbConnector, - table_name: String, -} - -impl ConsumerStateTable { - pub fn new(db: DbConnector, table_name: &str, pop_batch_size: Option, pri: Option) -> Result { - let table_name_string = String::from(table_name); - let table_name = cstr(table_name); - let pop_batch_size = pop_batch_size.as_ref().map(|n| n as *const i32).unwrap_or(null()); - let pri = pri.as_ref().map(|n| n as *const i32).unwrap_or(null()); - let ptr = unsafe { - swss_try!(p_cst => SWSSConsumerStateTable_new(db.ptr, table_name.as_ptr(), pop_batch_size, pri, p_cst))? - }; - Ok(Self { - ptr, - db, - table_name: table_name_string, - }) - } - - pub fn pops(&self) -> Result> { - unsafe { - let arr = swss_try!(p_arr => SWSSConsumerStateTable_pops(self.ptr, p_arr))?; - Ok(take_key_op_field_values_array(arr)) - } - } - - pub fn get_fd(&self) -> Result { - // SAFETY: This fd represents the underlying redis connection, which should stay alive - // as long as the DbConnector does. - unsafe { - let fd = swss_try!(p_fd => SWSSConsumerStateTable_getFd(self.ptr, p_fd))?; - let fd = BorrowedFd::borrow_raw(fd.try_into().unwrap()); - Ok(fd) - } - } - - pub fn read_data(&self, timeout: Duration, interrupt_on_signal: bool) -> Result { - let timeout_ms = timeout.as_millis().try_into().unwrap(); - let res = unsafe { - swss_try!(p_res => { - SWSSConsumerStateTable_readData(self.ptr, timeout_ms, interrupt_on_signal as u8, p_res) - })? - }; - Ok(SelectResult::from_raw(res)) - } - - pub fn db_connector(&self) -> &DbConnector { - &self.db - } - - pub fn db_connector_mut(&mut self) -> &mut DbConnector { - &mut self.db - } - - pub fn table_name(&self) -> &str { - &self.table_name - } -} - -impl Drop for ConsumerStateTable { - fn drop(&mut self) { - unsafe { swss_try!(SWSSConsumerStateTable_free(self.ptr)).expect("Dropping ConsumerStateTable") }; - } -} - -unsafe impl Send for ConsumerStateTable {} - -/// Async versions of methods -#[cfg(feature = "async")] -impl ConsumerStateTable { - async_util::impl_read_data_async!(); - async_util::impl_basic_async_method!(pops_async <= pops(&self) -> Result>); -} diff --git a/crates/swss-common/src/types/cxxstring.rs b/crates/swss-common/src/types/cxxstring.rs deleted file mode 100644 index c9989e9..0000000 --- a/crates/swss-common/src/types/cxxstring.rs +++ /dev/null @@ -1,328 +0,0 @@ -use crate::bindings::*; -use serde::{Deserialize, Serialize}; -use std::{ - borrow::{Borrow, Cow}, - fmt::Debug, - hash::Hash, - ops::Deref, - ptr::NonNull, - slice, - str::Utf8Error, -}; - -/// A C++ `std::string` that can be moved around and accessed from Rust. -#[repr(transparent)] -#[derive(Eq)] -pub struct CxxString { - ptr: NonNull, -} - -impl CxxString { - /// Construct a CxxString from an SWSSString. - /// The `CxxString` is now responsible for freeing the string, so it should not be freed manually. - /// Returns `None` if `s` is null. - pub(crate) unsafe fn take(s: SWSSString) -> Option { - NonNull::new(s).map(|ptr| CxxString { ptr }) - } - - /// Convert `self` into a wrapper type which provides safe mutable access to the underlying `std::string`. - pub(crate) fn into_raw(self) -> RawMutableSWSSString { - RawMutableSWSSString(self) - } - - /// Copies the given data into a new C++ string. - pub fn new(data: impl AsRef<[u8]>) -> CxxString { - unsafe { - let ptr = data.as_ref().as_ptr() as *const libc::c_char; - let len = data.as_ref().len().try_into().unwrap(); - CxxString::take(SWSSString_new(ptr, len)).unwrap() - } - } - - /// Borrows a `CxxStr` from this string. - /// - /// Like `String::as_str`, this method is unnecessary where deref coercion can be used. - pub fn as_cxx_str(&self) -> &CxxStr { - self - } -} - -impl Serialize for CxxString { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - self.as_cxx_str().serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for CxxString { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct CxxStringVisitor; - - impl serde::de::Visitor<'_> for CxxStringVisitor { - type Value = CxxString; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("string or bytes") - } - - fn visit_bytes(self, v: &[u8]) -> Result - where - E: serde::de::Error, - { - Ok(CxxString::new(v)) - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - Ok(CxxString::new(v)) - } - } - - deserializer.deserialize_bytes(CxxStringVisitor) - } -} - -/// Wrapper type around `CxxString` which disables access to methods that borrow the underlying -/// data, like `.deref()`. This prevents code from creating an `SWSSString` and `SWSSStrRef` at the -/// same time, which would cause a data race in multi-threaded contexts. -/// -/// A similar struct could be made which would allow access via just an `&mut CxxString`, but any -/// function taking `SWSSString` will destroy the underlying data anyway, so we might as well just -/// drop it when we're done. -/// -/// This newtype needs to exist (as opposed to into_raw returning the raw pointer) so that the -/// string is not dropped immediately. -pub(crate) struct RawMutableSWSSString(CxxString); - -impl RawMutableSWSSString { - pub(crate) fn as_raw(&self) -> SWSSString { - self.0.ptr.as_ptr() - } -} - -impl> From for CxxString { - fn from(bytes: T) -> Self { - CxxString::new(bytes.as_ref()) - } -} - -impl Drop for CxxString { - fn drop(&mut self) { - unsafe { SWSSString_free(self.ptr.as_ptr()) } - } -} - -/// This calls [CxxStr::to_string_lossy] which may clone the underlying data (i.e. may be expensive). -impl Debug for CxxString { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.deref().fmt(f) - } -} - -impl Clone for CxxString { - fn clone(&self) -> Self { - CxxString::new(self.as_bytes()) - } -} - -/// SAFETY: A C++ string has no thread related state. -unsafe impl Send for CxxString {} - -/// SAFETY: A `CxxString` can only be mutated through `.into_raw()` which takes ownership of `self`. -/// Thus, normal Rust borrow checking rules will prevent data races. -unsafe impl Sync for CxxString {} - -impl Deref for CxxString { - type Target = CxxStr; - - fn deref(&self) -> &Self::Target { - // SAFETY: CxxString and CxxStr are both repr(transparent) and identical in alignment & - // size, and the C API guarantees that SWSSString can always be cast into SWSSStrRef - unsafe { std::mem::transmute(self) } - } -} - -impl PartialOrd for CxxString { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for CxxString { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.deref().cmp(other) - } -} - -impl PartialEq for CxxString { - fn eq(&self, other: &Self) -> bool { - self.deref().eq(other) - } -} - -impl PartialEq for CxxString { - fn eq(&self, other: &CxxStr) -> bool { - self.deref().eq(other) - } -} - -impl PartialEq for CxxString { - fn eq(&self, other: &str) -> bool { - self.deref().eq(other) - } -} - -impl PartialEq<&str> for CxxString { - fn eq(&self, other: &&str) -> bool { - self.deref().eq(other) - } -} - -impl PartialEq for CxxString { - fn eq(&self, other: &String) -> bool { - self.eq(other.as_str()) - } -} - -impl Hash for CxxString { - fn hash(&self, state: &mut H) { - self.deref().hash(state); - } -} - -impl Borrow for CxxString { - fn borrow(&self) -> &CxxStr { - self.deref() - } -} - -/// Equivalent of a C++ `const std::string&`, which can be borrowed from a [`CxxString`]. -/// -/// `CxxStr` has the same conceptual relationship with `CxxString` as a Rust `&str` does with `String`. -#[repr(transparent)] -#[derive(Eq)] -pub struct CxxStr { - ptr: NonNull, -} - -impl CxxStr { - /// This is safe because the C api guarantees that `SWSSStrRef` is read-only. On `CxxString`, this would not be safe. - pub(crate) fn as_raw(&self) -> SWSSStrRef { - self.ptr.as_ptr() - } - - /// Length of the string, not including a null terminator. - pub fn len(&self) -> usize { - unsafe { SWSSStrRef_length(self.as_raw()).try_into().unwrap() } - } - - /// Returns `true` if `self` has a length of zero bytes. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// The underlying bytes of the string, not including a null terminator. - pub fn as_bytes(&self) -> &[u8] { - unsafe { - let data = SWSSStrRef_c_str(self.as_raw()); - slice::from_raw_parts(data as *const u8, self.len()) - } - } - - /// Tries to convert the C++ string to a Rust `&str` without copying. This can only be done if - /// the string contains valid UTF-8. See [std::str::from_utf8]. - pub fn to_str(&self) -> Result<&str, Utf8Error> { - std::str::from_utf8(self.as_bytes()) - } - - /// Converts the C++ string to a Rust `&str` or `String`. If the string is valid UTF-8, the - /// result is a `&str` pointing to the original data. Otherwise, the result is a `String` with - /// a copy of the data, but with invalid UTF-8 replaced. See [String::from_utf8_lossy]. - pub fn to_string_lossy(&self) -> Cow<'_, str> { - String::from_utf8_lossy(self.as_bytes()) - } -} - -impl ToOwned for CxxStr { - type Owned = CxxString; - - fn to_owned(&self) -> Self::Owned { - CxxString::new(self.as_bytes()) - } -} - -impl Debug for CxxStr { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "\"{}\"", self.to_string_lossy()) - } -} - -impl PartialOrd for CxxStr { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for CxxStr { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.as_bytes().cmp(other.as_bytes()) - } -} - -impl Hash for CxxStr { - fn hash(&self, state: &mut H) { - self.as_bytes().hash(state); - } -} - -impl PartialEq for CxxStr { - fn eq(&self, other: &Self) -> bool { - self.as_bytes().eq(other.as_bytes()) - } -} - -impl PartialEq for CxxStr { - fn eq(&self, other: &CxxString) -> bool { - self.as_bytes().eq(other.as_bytes()) - } -} - -impl PartialEq for CxxStr { - fn eq(&self, other: &str) -> bool { - self.as_bytes().eq(other.as_bytes()) - } -} - -impl PartialEq<&str> for CxxStr { - fn eq(&self, other: &&str) -> bool { - self.eq(*other) - } -} - -impl PartialEq for CxxStr { - fn eq(&self, other: &String) -> bool { - self.eq(other.as_str()) - } -} - -/// SAFETY: `CxxStr` can never be obtained by value, only as a reference by dereferencing a -/// `CxxString`. Thus, the same logic as implementing `Sync` for `CxxString` applies. Mutation -/// is protected by `CxxString::into_raw` requiring an owned `self`. By the same logic, `Send` is -/// unnecessary to implement (you can never obtain a value to send). -unsafe impl Sync for CxxStr {} - -impl Serialize for CxxStr { - fn serialize(&self, serializer: S) -> Result { - match self.to_str() { - Ok(s) => serializer.serialize_str(s), - Err(_) => serializer.serialize_bytes(self.as_bytes()), - } - } -} diff --git a/crates/swss-common/src/types/dbconnector.rs b/crates/swss-common/src/types/dbconnector.rs deleted file mode 100644 index 152aded..0000000 --- a/crates/swss-common/src/types/dbconnector.rs +++ /dev/null @@ -1,249 +0,0 @@ -use super::*; -use crate::bindings::*; -use std::collections::HashMap; - -/// Rust wrapper around `swss::DBConnector`. -#[derive(Debug)] -pub struct DbConnector { - pub(crate) ptr: SWSSDBConnector, - connection: DbConnectionInfo, -} - -/// Details about how a DbConnector is connected to Redis -#[derive(Debug, Clone)] -pub enum DbConnectionInfo { - Tcp { - hostname: String, - port: u16, - db_id: i32, - }, - Unix { - sock_path: String, - db_id: i32, - }, - Named { - db_name: String, - is_tcp_conn: bool, - }, - Keyed { - db_name: String, - is_tcp_conn: bool, - container_name: String, - netns: String, - }, -} - -impl DbConnector { - /// Create a new DbConnector from [`DbConnectionInfo`]. - /// - /// Timeout of 0 means block indefinitely. - pub fn new(connection: DbConnectionInfo, timeout_ms: u32) -> Result { - let ptr = match &connection { - DbConnectionInfo::Tcp { hostname, port, db_id } => { - let hostname = cstr(hostname); - unsafe { - swss_try!(p_db => SWSSDBConnector_new_tcp(*db_id, hostname.as_ptr(), *port, timeout_ms, p_db))? - } - } - DbConnectionInfo::Unix { sock_path, db_id } => { - let sock_path = cstr(sock_path); - unsafe { swss_try!(p_db => SWSSDBConnector_new_unix(*db_id, sock_path.as_ptr(), timeout_ms, p_db))? } - } - DbConnectionInfo::Named { db_name, is_tcp_conn } => { - let db_name = cstr(db_name); - unsafe { - swss_try!(p_db => SWSSDBConnector_new_named(db_name.as_ptr(), timeout_ms, *is_tcp_conn as u8, p_db))? - } - } - DbConnectionInfo::Keyed { - db_name, - is_tcp_conn, - container_name, - netns, - } => { - let db_name = cstr(db_name); - let container_name = cstr(container_name); - let netns = cstr(netns); - unsafe { - swss_try!(p_db => SWSSDBConnector_new_keyed(db_name.as_ptr(), timeout_ms, *is_tcp_conn as u8, - container_name.as_ptr(), netns.as_ptr(), p_db))? - } - } - }; - - Ok(Self { ptr, connection }) - } - - /// Create a DbConnector from a named entry in the SONiC db config. - /// - /// Timeout of 0 means block indefinitely. - pub fn new_named(db_name: impl Into, is_tcp_conn: bool, timeout_ms: u32) -> Result { - let db_name = db_name.into(); - Self::new(DbConnectionInfo::Named { db_name, is_tcp_conn }, timeout_ms) - } - - /// Create a DbConnector over a tcp socket. - /// - /// Timeout of 0 means block indefinitely. - pub fn new_tcp(db_id: i32, hostname: impl Into, port: u16, timeout_ms: u32) -> Result { - let hostname = hostname.into(); - Self::new(DbConnectionInfo::Tcp { hostname, port, db_id }, timeout_ms) - } - - /// Create a DbConnector over a unix socket. - /// - /// Timeout of 0 means block indefinitely. - pub fn new_unix(db_id: i32, sock_path: impl Into, timeout_ms: u32) -> Result { - let sock_path = sock_path.into(); - Self::new(DbConnectionInfo::Unix { sock_path, db_id }, timeout_ms) - } - - pub fn new_keyed( - db_name: impl Into, - is_tcp_conn: bool, - timeout_ms: u32, - container_name: impl Into, - netns: impl Into, - ) -> Result { - let db_name = db_name.into(); - let container_name = container_name.into(); - let netns = netns.into(); - Self::new( - DbConnectionInfo::Keyed { - db_name, - is_tcp_conn, - container_name, - netns, - }, - timeout_ms, - ) - } - - /// Clone a DbConnector with a timeout. - /// - /// Timeout of 0 means block indefinitely. - pub fn clone_timeout(&self, timeout_ms: u32) -> Result { - Self::new(self.connection.clone(), timeout_ms) - } - - pub fn connection(&self) -> &DbConnectionInfo { - &self.connection - } - - pub fn del(&self, key: &str) -> Result { - let key = cstr(key); - let status = unsafe { swss_try!(p_status => SWSSDBConnector_del(self.ptr, key.as_ptr(), p_status))? }; - Ok(status == 1) - } - - pub fn set(&self, key: &str, val: &CxxStr) -> Result<()> { - let key = cstr(key); - unsafe { swss_try!(SWSSDBConnector_set(self.ptr, key.as_ptr(), val.as_raw())) } - } - - pub fn get(&self, key: &str) -> Result> { - let key = cstr(key); - unsafe { - let ans = swss_try!(p_ans => SWSSDBConnector_get(self.ptr, key.as_ptr(), p_ans))?; - Ok(CxxString::take(ans)) - } - } - - pub fn exists(&self, key: &str) -> Result { - let key = cstr(key); - let status = unsafe { swss_try!(p_status => SWSSDBConnector_exists(self.ptr, key.as_ptr(), p_status))? }; - Ok(status == 1) - } - - pub fn hdel(&self, key: &str, field: &str) -> Result { - let key = cstr(key); - let field = cstr(field); - let status = - unsafe { swss_try!(p_status => SWSSDBConnector_hdel(self.ptr, key.as_ptr(), field.as_ptr(), p_status))? }; - Ok(status == 1) - } - - pub fn hset(&self, key: &str, field: &str, val: &CxxStr) -> Result<()> { - let key = cstr(key); - let field = cstr(field); - unsafe { - swss_try!(SWSSDBConnector_hset( - self.ptr, - key.as_ptr(), - field.as_ptr(), - val.as_raw(), - )) - } - } - - pub fn hget(&self, key: &str, field: &str) -> Result> { - let key = cstr(key); - let field = cstr(field); - unsafe { - let ans = swss_try!(p_ans => SWSSDBConnector_hget(self.ptr, key.as_ptr(), field.as_ptr(), p_ans))?; - Ok(CxxString::take(ans)) - } - } - - pub fn hgetall(&self, key: &str) -> Result> { - let key = cstr(key); - unsafe { - let arr = swss_try!(p_arr => SWSSDBConnector_hgetall(self.ptr, key.as_ptr(), p_arr))?; - Ok(take_field_value_array(arr)) - } - } - - pub fn hexists(&self, key: &str, field: &str) -> Result { - let key = cstr(key); - let field = cstr(field); - let status = unsafe { - swss_try!(p_status => SWSSDBConnector_hexists(self.ptr, key.as_ptr(), field.as_ptr(), p_status))? - }; - Ok(status == 1) - } - - pub fn flush_db(&self) -> Result { - let status = unsafe { swss_try!(p_status => SWSSDBConnector_flushdb(self.ptr, p_status))? }; - Ok(status == 1) - } -} - -impl Clone for DbConnector { - /// Clone with a default timeout of 15 seconds. - /// - /// 15 seconds was picked as an absurdly long time to wait for Redis to respond. - /// Panics after a timeout, or if any other exception occurred. - /// Use `clone_timeout` for control of timeout and exception handling. - fn clone(&self) -> Self { - self.clone_timeout(15000).expect("DbConnector::clone failed") - } -} - -impl Drop for DbConnector { - fn drop(&mut self) { - unsafe { swss_try!(SWSSDBConnector_free(self.ptr)).expect("Dropping DbConnector") }; - } -} - -unsafe impl Send for DbConnector {} - -#[cfg(feature = "async")] -impl DbConnector { - async_util::impl_basic_async_method!(new_async <= new(connection: DbConnectionInfo, timeout_ms: u32) -> Result); - async_util::impl_basic_async_method!(new_named_async <= new_named(db_name: &str, is_tcp_conn: bool, timeout_ms: u32) -> Result); - async_util::impl_basic_async_method!(new_tcp_async <= new_tcp(db_id: i32, hostname: &str, port: u16, timeout_ms: u32) -> Result); - async_util::impl_basic_async_method!(new_unix_async <= new_unix(db_id: i32, sock_path: &str, timeout_ms: u32) -> Result); - async_util::impl_basic_async_method!(new_keyed_async <= new_keyed(db_name: &str, is_tcp_conn: bool, timeout_ms: u32, container_name: &str, netns: &str) -> Result); - async_util::impl_basic_async_method!(clone_timeout_async <= clone_timeout(&self, timeout_ms: u32) -> Result); - async_util::impl_basic_async_method!(del_async <= del(&self, key: &str) -> Result); - async_util::impl_basic_async_method!(set_async <= set(&self, key: &str, value: &CxxStr) -> Result<()>); - async_util::impl_basic_async_method!(get_async <= get(&self, key: &str) -> Result>); - async_util::impl_basic_async_method!(exists_async <= exists(&self, key: &str) -> Result); - async_util::impl_basic_async_method!(hdel_async <= hdel(&self, key: &str, field: &str) -> Result); - async_util::impl_basic_async_method!(hset_async <= hset(&self, key: &str, field: &str, value: &CxxStr) -> Result<()>); - async_util::impl_basic_async_method!(hget_async <= hget(&self, key: &str, field: &str) -> Result>); - async_util::impl_basic_async_method!(hgetall_async <= hgetall(&self, key: &str) -> Result>); - async_util::impl_basic_async_method!(hexists_async <= hexists(&self, key: &str, field: &str) -> Result); - async_util::impl_basic_async_method!(flush_db_async <= flush_db(&self) -> Result); - async_util::impl_basic_async_method!(clone_async <= clone(&self) -> Self); -} diff --git a/crates/swss-common/src/types/exception.rs b/crates/swss-common/src/types/exception.rs deleted file mode 100644 index bc7a611..0000000 --- a/crates/swss-common/src/types/exception.rs +++ /dev/null @@ -1,79 +0,0 @@ -use super::*; -use crate::bindings::*; - -/// Rust version of an `SWSSResult`. -/// -/// When an `SWSSResult` is success/`SWSSException_None`, the rust function will return `Ok(..)`. -/// Otherwise, the rust function will return `Err(Exception)`. -pub type Result = std::result::Result; - -/// "Try" an `SWSSResult`, or a call that produces an `SWSSResult`, and convert it into a rust `Result`. -macro_rules! swss_try { - // Convert an SWSSResult to a Result<(), Exception> - ($result:expr) => {{ - let res = $result; - if res.exception == $crate::bindings::SWSSException_SWSSException_None { - Ok(()) - } else { - Err($crate::types::Exception::take(res)) - } - }}; - - // Call a function that takes an output pointer, then convert its result to a Result<(), Exception> - ($out_ptr:ident => $result:expr) => {{ - let mut out = ::std::mem::MaybeUninit::uninit(); - let $out_ptr = out.as_mut_ptr(); - let res = $result; - if res.exception == $crate::bindings::SWSSException_SWSSException_None { - Ok(out.assume_init()) - } else { - Err($crate::types::Exception::take(res)) - } - }}; -} -pub(crate) use swss_try; - -/// Rust version of the body of a failed `SWSSResult`. -#[derive(Debug, Clone)] -pub struct Exception { - message: String, - location: String, -} - -impl Exception { - /// Construct an `Exception` from the details of a failed `SWSSResult`. - pub(crate) unsafe fn take(res: SWSSResult) -> Self { - assert_ne!( - res.exception, SWSSException_SWSSException_None, - "Exception::take() on non-exception SWSSResult: {res:#?}" - ); - Self { - message: CxxString::take(res.message) - .expect("SWSSResult missing message") - .to_string_lossy() - .into_owned(), - location: CxxString::take(res.location) - .expect("SWSSResult missing location") - .to_string_lossy() - .into_owned(), - } - } - - /// Get an informational string about the error that occurred. - pub fn message(&self) -> &str { - &self.message - } - - /// Get an informational string about the where in the code the error occurred. - pub fn location(&self) -> &str { - &self.location - } -} - -impl Display for Exception { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[{}] {}", self.location, self.message) - } -} - -impl Error for Exception {} diff --git a/crates/swss-common/src/types/logger.rs b/crates/swss-common/src/types/logger.rs deleted file mode 100644 index 6555ec8..0000000 --- a/crates/swss-common/src/types/logger.rs +++ /dev/null @@ -1,92 +0,0 @@ -use super::*; -use getset::{Getters, Setters}; -use lazy_static::lazy_static; -use std::ffi::c_char; -use std::sync::Mutex; - -lazy_static! { - static ref LOGGER: Mutex = Mutex::new(Logger::default()); -} - -pub trait LoggerConfigChangeHandler: Send { - fn on_log_level_change(&mut self, level: &str); - fn on_log_output_change(&mut self, output: &str); -} - -#[derive(Getters, Setters)] -pub struct Logger { - #[getset(get = "pub", set)] - level: String, - #[getset(get = "pub", set)] - output: String, - #[getset(skip)] - handler: Option>, -} - -impl Default for Logger { - fn default() -> Self { - Self { - level: "INFO".to_string(), - output: "STDOUT".to_string(), - handler: None, - } - } -} - -pub fn link_to_swsscommon_logger(db_name: &str, logger_change_handler: T) -> Result<()> -where - T: LoggerConfigChangeHandler + 'static, -{ - { - let mut logger = LOGGER.lock().unwrap(); - logger.handler = Some(Box::new(logger_change_handler)); - } - - let db_name = cstr(db_name); - let def_level = cstr("INFO"); - let def_output = cstr("SYSLOG"); - // link to config db using the specified db_name. It will trigger immediate - // callback to set the current log level and output - unsafe { - swss_try!(SWSSLogger_linkToDbWithOutput( - db_name.as_ptr(), - Some(priority_change_notify), - def_level.as_ptr(), - Some(output_change_notify), - def_output.as_ptr(), - ))?; - // Start logger thread to handle log update from config db - swss_try!(SWSSLogger_restartLogger())?; - Ok(()) - } -} - -extern "C" fn priority_change_notify(_component: *const c_char, priority: *const c_char) { - let priority = unsafe { CStr::from_ptr(priority) }; - let priority = priority.to_str().unwrap(); - - let mut logger = LOGGER.lock().unwrap(); - logger.handler.as_mut().unwrap().as_mut().on_log_level_change(priority); - - logger.set_level(priority.to_string()); - - println!("Priority set to: {priority}"); -} - -extern "C" fn output_change_notify(_component: *const c_char, output: *const c_char) { - let output = unsafe { CStr::from_ptr(output) }; - let output = output.to_str().unwrap(); - - let mut logger = LOGGER.lock().unwrap(); - logger.handler.as_mut().unwrap().as_mut().on_log_output_change(output); - logger.set_output(output.to_string()); - println!("Output set to: {output}"); -} - -pub fn log_level() -> String { - LOGGER.lock().unwrap().level().to_string() -} - -pub fn log_output() -> String { - LOGGER.lock().unwrap().output().to_string() -} diff --git a/crates/swss-common/src/types/producerstatetable.rs b/crates/swss-common/src/types/producerstatetable.rs deleted file mode 100644 index 6bfbbe6..0000000 --- a/crates/swss-common/src/types/producerstatetable.rs +++ /dev/null @@ -1,78 +0,0 @@ -use super::*; -use crate::bindings::*; - -/// Rust wrapper around `swss::ProducerStateTable`. -#[derive(Debug)] -pub struct ProducerStateTable { - ptr: SWSSProducerStateTable, - _db: DbConnector, -} - -impl ProducerStateTable { - pub fn new(db: DbConnector, table_name: &str) -> Result { - let table_name = cstr(table_name); - let ptr = unsafe { swss_try!(p_pst => SWSSProducerStateTable_new(db.ptr, table_name.as_ptr(), p_pst))? }; - Ok(Self { ptr, _db: db }) - } - - pub fn set_buffered(&self, buffered: bool) -> Result<()> { - unsafe { swss_try!(SWSSProducerStateTable_setBuffered(self.ptr, buffered as u8)) } - } - - pub fn set(&self, key: &str, fvs: I) -> Result<()> - where - I: IntoIterator, - F: AsRef<[u8]>, - V: Into, - { - let key = cstr(key); - let (arr, _k) = make_field_value_array(fvs); - unsafe { swss_try!(SWSSProducerStateTable_set(self.ptr, key.as_ptr(), arr)) } - } - - pub fn del(&self, key: &str) -> Result<()> { - let key = cstr(key); - unsafe { swss_try!(SWSSProducerStateTable_del(self.ptr, key.as_ptr())) } - } - - pub fn flush(&self) -> Result<()> { - unsafe { swss_try!(SWSSProducerStateTable_flush(self.ptr)) } - } - - pub fn count(&self) -> Result { - unsafe { swss_try!(p_count => SWSSProducerStateTable_count(self.ptr, p_count)) } - } - - pub fn clear(&self) -> Result<()> { - unsafe { swss_try!(SWSSProducerStateTable_clear(self.ptr)) } - } - - pub fn create_temp_view(&self) -> Result<()> { - unsafe { swss_try!(SWSSProducerStateTable_create_temp_view(self.ptr)) } - } - - pub fn apply_temp_view(&self) -> Result<()> { - unsafe { swss_try!(SWSSProducerStateTable_apply_temp_view(self.ptr)) } - } -} - -impl Drop for ProducerStateTable { - fn drop(&mut self) { - unsafe { swss_try!(SWSSProducerStateTable_free(self.ptr)).expect("Dropping ProducerStateTable") }; - } -} - -unsafe impl Send for ProducerStateTable {} - -#[cfg(feature = "async")] -impl ProducerStateTable { - async_util::impl_basic_async_method!( - set_async <= set(&self, key: &str, fvs: I) -> Result<()> - where - I: IntoIterator + Send, - F: AsRef<[u8]>, - V: Into, - ); - async_util::impl_basic_async_method!(del_async <= del(&self, key: &str) -> Result<()>); - async_util::impl_basic_async_method!(flush_async <= flush(&self) -> Result<()>); -} diff --git a/crates/swss-common/src/types/subscriberstatetable.rs b/crates/swss-common/src/types/subscriberstatetable.rs deleted file mode 100644 index c72eae3..0000000 --- a/crates/swss-common/src/types/subscriberstatetable.rs +++ /dev/null @@ -1,85 +0,0 @@ -use super::*; -use crate::bindings::*; -use std::{os::fd::BorrowedFd, ptr::null, time::Duration}; - -/// Rust wrapper around `swss::SubscriberStateTable`. -#[derive(Debug)] -pub struct SubscriberStateTable { - ptr: SWSSSubscriberStateTable, - db: DbConnector, - table_name: String, -} - -impl SubscriberStateTable { - pub fn new(db: DbConnector, table_name: &str, pop_batch_size: Option, pri: Option) -> Result { - let table_name_string = String::from(table_name); - let table_name = cstr(table_name); - let pop_batch_size = pop_batch_size.as_ref().map(|n| n as *const i32).unwrap_or(null()); - let pri = pri.as_ref().map(|n| n as *const i32).unwrap_or(null()); - let ptr = unsafe { - swss_try!(p_sst => { - SWSSSubscriberStateTable_new(db.ptr, table_name.as_ptr(), pop_batch_size, pri, p_sst) - })? - }; - Ok(Self { - ptr, - db, - table_name: table_name_string, - }) - } - - pub fn pops(&self) -> Result> { - unsafe { - let arr = swss_try!(p_arr => SWSSSubscriberStateTable_pops(self.ptr, p_arr))?; - Ok(take_key_op_field_values_array(arr)) - } - } - - pub fn read_data(&self, timeout: Duration, interrupt_on_signal: bool) -> Result { - let timeout_ms = timeout.as_millis().try_into().unwrap(); - let res = unsafe { - swss_try!(p_res => { - SWSSSubscriberStateTable_readData(self.ptr, timeout_ms, interrupt_on_signal as u8, p_res) - })? - }; - Ok(SelectResult::from_raw(res)) - } - - pub fn get_fd(&self) -> Result { - // SAFETY: This fd represents the underlying redis connection, which should stay alive - // as long as the DbConnector does. - unsafe { - let fd = swss_try!(p_fd => SWSSSubscriberStateTable_getFd(self.ptr, p_fd))?; - let fd = BorrowedFd::borrow_raw(fd.try_into().unwrap()); - Ok(fd) - } - } - - pub fn db_connector(&self) -> &DbConnector { - &self.db - } - - pub fn db_connector_mut(&mut self) -> &mut DbConnector { - &mut self.db - } - - pub fn table_name(&self) -> &str { - &self.table_name - } -} - -impl Drop for SubscriberStateTable { - fn drop(&mut self) { - unsafe { swss_try!(SWSSSubscriberStateTable_free(self.ptr)).expect("Dropping SubscriberStateTable") }; - } -} - -unsafe impl Send for SubscriberStateTable {} - -/// Async versions of methods -#[cfg(feature = "async")] -impl SubscriberStateTable { - async_util::impl_read_data_async!(); - async_util::impl_basic_async_method!(new_async <= new(db: DbConnector, table_name: &str, pop_batch_size: Option, pri: Option) -> Result); - async_util::impl_basic_async_method!(pops_async <= pops(&self) -> Result>); -} diff --git a/crates/swss-common/src/types/table.rs b/crates/swss-common/src/types/table.rs deleted file mode 100644 index 6aec3af..0000000 --- a/crates/swss-common/src/types/table.rs +++ /dev/null @@ -1,119 +0,0 @@ -use super::*; -use crate::bindings::*; -use std::ptr; - -/// Rust wrapper around `swss::Table`. -#[derive(Debug)] -pub struct Table { - name: String, - ptr: SWSSTable, - _db: DbConnector, -} - -impl Table { - pub fn new(db: DbConnector, table_name: &str) -> Result { - let table_name_copy = table_name.to_string(); - let table_name = cstr(table_name); - let ptr = unsafe { swss_try!(p_tbl => SWSSTable_new(db.ptr, table_name.as_ptr(), p_tbl))? }; - Ok(Self { - name: table_name_copy, - ptr, - _db: db, - }) - } - - pub fn get(&self, key: &str) -> Result> { - let key = cstr(key); - let mut arr = SWSSFieldValueArray { - len: 0, - data: ptr::null_mut(), - }; - let exists = unsafe { swss_try!(p_exists => SWSSTable_get(self.ptr, key.as_ptr(), &mut arr, p_exists))? }; - let maybe_fvs = if exists == 1 { - Some(unsafe { take_field_value_array(arr) }) - } else { - None - }; - Ok(maybe_fvs) - } - - pub fn hget(&self, key: &str, field: &str) -> Result> { - let key = cstr(key); - let field = cstr(field); - let mut val: SWSSString = ptr::null_mut(); - let exists = unsafe { - swss_try!(p_exists => SWSSTable_hget(self.ptr, key.as_ptr(), field.as_ptr(), &mut val, p_exists))? - }; - let maybe_fvs = if exists == 1 { - Some(unsafe { CxxString::take(val).unwrap() }) - } else { - None - }; - Ok(maybe_fvs) - } - - pub fn set(&self, key: &str, fvs: I) -> Result<()> - where - I: IntoIterator, - F: AsRef<[u8]>, - V: Into, - { - let key = cstr(key); - let (arr, _k) = make_field_value_array(fvs); - unsafe { swss_try!(SWSSTable_set(self.ptr, key.as_ptr(), arr)) } - } - - pub fn hset(&self, key: &str, field: &str, val: &CxxStr) -> Result<()> { - let key = cstr(key); - let field = cstr(field); - unsafe { swss_try!(SWSSTable_hset(self.ptr, key.as_ptr(), field.as_ptr(), val.as_raw())) } - } - - pub fn del(&self, key: &str) -> Result<()> { - let key = cstr(key); - unsafe { swss_try!(SWSSTable_del(self.ptr, key.as_ptr())) } - } - - pub fn hdel(&self, key: &str, field: &str) -> Result<()> { - let key = cstr(key); - let field = cstr(field); - unsafe { swss_try!(SWSSTable_hdel(self.ptr, key.as_ptr(), field.as_ptr())) } - } - - pub fn get_keys(&self) -> Result> { - unsafe { - let arr = swss_try!(p_arr => SWSSTable_getKeys(self.ptr, p_arr))?; - Ok(take_string_array(arr)) - } - } - - pub fn get_name(&self) -> &str { - &self.name - } -} - -impl Drop for Table { - fn drop(&mut self) { - unsafe { swss_try!(SWSSTable_free(self.ptr)).expect("Dropping Table") }; - } -} - -unsafe impl Send for Table {} - -#[cfg(feature = "async")] -impl Table { - async_util::impl_basic_async_method!(new_async <= new(db: DbConnector, table_name: &str) -> Result); - async_util::impl_basic_async_method!(get_async <= get(&self, key: &str) -> Result>); - async_util::impl_basic_async_method!(hget_async <= hget(&self, key: &str, field: &str) -> Result>); - async_util::impl_basic_async_method!( - set_async <= set(&self, key: &str, fvs: I) -> Result<()> - where - I: IntoIterator + Send, - F: AsRef<[u8]>, - V: Into, - ); - async_util::impl_basic_async_method!(hset_async <= hset(&self, key: &str, field: &str, value: &CxxStr) -> Result<()>); - async_util::impl_basic_async_method!(del_async <= del(&self, key: &str) -> Result<()>); - async_util::impl_basic_async_method!(hdel_async <= hdel(&self, key: &str, field: &str) -> Result<()>); - async_util::impl_basic_async_method!(get_keys_async <= get_keys(&self) -> Result>); -} diff --git a/crates/swss-common/src/types/zmqclient.rs b/crates/swss-common/src/types/zmqclient.rs deleted file mode 100644 index 489c10e..0000000 --- a/crates/swss-common/src/types/zmqclient.rs +++ /dev/null @@ -1,61 +0,0 @@ -use super::*; -use crate::bindings::*; - -/// Rust wrapper around `swss::ZmqClient`. -#[derive(Debug)] -pub struct ZmqClient { - pub(crate) ptr: SWSSZmqClient, -} - -impl ZmqClient { - pub fn new(endpoint: &str) -> Result { - let endpoint = cstr(endpoint); - let ptr = unsafe { swss_try!(p_zc => SWSSZmqClient_new(endpoint.as_ptr(), p_zc))? }; - Ok(Self { ptr }) - } - - pub fn is_connected(&self) -> Result { - let status = unsafe { swss_try!(p_status => SWSSZmqClient_isConnected(self.ptr, p_status))? }; - Ok(status == 1) - } - - pub fn connect(&self) -> Result<()> { - unsafe { swss_try!(SWSSZmqClient_connect(self.ptr)) } - } - - pub fn send_msg(&self, db_name: &str, table_name: &str, kfvs: I) -> Result<()> - where - I: IntoIterator, - { - let db_name = cstr(db_name); - let table_name = cstr(table_name); - let (kfvs, _k) = make_key_op_field_values_array(kfvs); - unsafe { - swss_try!(SWSSZmqClient_sendMsg( - self.ptr, - db_name.as_ptr(), - table_name.as_ptr(), - kfvs, - )) - } - } -} - -impl Drop for ZmqClient { - fn drop(&mut self) { - unsafe { swss_try!(SWSSZmqClient_free(self.ptr)).expect("Dropping ZmqClient") }; - } -} - -unsafe impl Send for ZmqClient {} - -#[cfg(feature = "async")] -impl ZmqClient { - async_util::impl_basic_async_method!(new_async <= new(endpoint: &str) -> Result); - async_util::impl_basic_async_method!(connect_async <= connect(&self) -> Result<()>); - async_util::impl_basic_async_method!( - send_msg_async <= send_msg(&self, db_name: &str, table_name: &str, kfvs: I) -> Result<()> - where - I: IntoIterator + Send, - ); -} diff --git a/crates/swss-common/src/types/zmqconsumerstatetable.rs b/crates/swss-common/src/types/zmqconsumerstatetable.rs deleted file mode 100644 index 0488b76..0000000 --- a/crates/swss-common/src/types/zmqconsumerstatetable.rs +++ /dev/null @@ -1,92 +0,0 @@ -use super::*; -use crate::bindings::*; -use std::{os::fd::BorrowedFd, ptr::null, sync::Arc, time::Duration}; - -/// Rust wrapper around `swss::ZmqConsumerStateTable`. -#[derive(Debug)] -pub struct ZmqConsumerStateTable { - ptr: SWSSZmqConsumerStateTable, - _db: DbConnector, - - /// See [`DropGuard`] and [`ZmqServer`]. - _drop_guard: Arc, -} - -impl ZmqConsumerStateTable { - pub fn new( - db: DbConnector, - table_name: &str, - zmqs: &mut ZmqServer, - pop_batch_size: Option, - pri: Option, - ) -> Result { - let table_name = cstr(table_name); - let pop_batch_size = pop_batch_size.as_ref().map(|n| n as *const i32).unwrap_or(null()); - let pri = pri.as_ref().map(|n| n as *const i32).unwrap_or(null()); - let ptr = unsafe { - swss_try!(p_zs => { - SWSSZmqConsumerStateTable_new(db.ptr, table_name.as_ptr(), zmqs.ptr, pop_batch_size, pri, p_zs) - })? - }; - let drop_guard = Arc::new(DropGuard(ptr)); - zmqs.register_consumer_state_table(drop_guard.clone()); - Ok(Self { - ptr, - _db: db, - _drop_guard: drop_guard, - }) - } - - pub fn pops(&self) -> Result> { - unsafe { - let arr = swss_try!(p_arr => SWSSZmqConsumerStateTable_pops(self.ptr, p_arr))?; - Ok(take_key_op_field_values_array(arr)) - } - } - - pub fn get_fd(&self) -> Result { - // SAFETY: This fd represents the underlying zmq socket, which should remain alive as long as there - // is a listener (i.e. a ZmqConsumerStateTable) - unsafe { - let fd = swss_try!(p_fd => SWSSZmqConsumerStateTable_getFd(self.ptr, p_fd))?; - let fd = BorrowedFd::borrow_raw(fd.try_into().unwrap()); - Ok(fd) - } - } - - pub fn read_data(&self, timeout: Duration, interrupt_on_signal: bool) -> Result { - let timeout_ms = timeout.as_millis().try_into().unwrap(); - let res = unsafe { - swss_try!(p_res => - SWSSZmqConsumerStateTable_readData(self.ptr, timeout_ms, interrupt_on_signal as u8, p_res) - )? - }; - Ok(SelectResult::from_raw(res)) - } -} - -unsafe impl Send for ZmqConsumerStateTable {} - -/// A type that will free the underlying `ZmqConsumerStateTable` when it is dropped. -/// This is shared with `ZmqServer` -#[derive(Debug)] -pub(crate) struct DropGuard(SWSSZmqConsumerStateTable); - -impl Drop for DropGuard { - fn drop(&mut self) { - unsafe { swss_try!(SWSSZmqConsumerStateTable_free(self.0)).expect("Dropping ZmqConsumerStateTable") }; - } -} - -// SAFETY: This is safe as long as ZmqConsumerStateTable is Send -unsafe impl Send for DropGuard {} - -// SAFETY: There is no way to use &DropGuard so it is safe -unsafe impl Sync for DropGuard {} - -/// Async versions of methods -#[cfg(feature = "async")] -impl ZmqConsumerStateTable { - async_util::impl_read_data_async!(); - async_util::impl_basic_async_method!(pops_async <= pops(&self) -> Result>); -} diff --git a/crates/swss-common/src/types/zmqproducerstatetable.rs b/crates/swss-common/src/types/zmqproducerstatetable.rs deleted file mode 100644 index 9ee69f3..0000000 --- a/crates/swss-common/src/types/zmqproducerstatetable.rs +++ /dev/null @@ -1,67 +0,0 @@ -use super::*; -use crate::bindings::*; - -/// Rust wrapper around `swss::ZmqProducerStateTable`. -#[derive(Debug)] -pub struct ZmqProducerStateTable { - ptr: SWSSZmqProducerStateTable, - _db: DbConnector, - _zmqc: ZmqClient, -} - -impl ZmqProducerStateTable { - pub fn new(db: DbConnector, table_name: &str, zmqc: ZmqClient, db_persistence: bool) -> Result { - let table_name = cstr(table_name); - let db_persistence = db_persistence as u8; - let ptr = unsafe { - swss_try!(p_zpst => - SWSSZmqProducerStateTable_new(db.ptr, table_name.as_ptr(), zmqc.ptr, db_persistence, p_zpst) - )? - }; - Ok(Self { - ptr, - _db: db, - _zmqc: zmqc, - }) - } - - pub fn set(&self, key: &str, fvs: I) -> Result<()> - where - I: IntoIterator, - F: AsRef<[u8]>, - V: Into, - { - let key = cstr(key); - let (arr, _k) = make_field_value_array(fvs); - unsafe { swss_try!(SWSSZmqProducerStateTable_set(self.ptr, key.as_ptr(), arr)) } - } - - pub fn del(&self, key: &str) -> Result<()> { - let key = cstr(key); - unsafe { swss_try!(SWSSZmqProducerStateTable_del(self.ptr, key.as_ptr())) } - } - - pub fn db_updater_queue_size(&self) -> Result { - unsafe { swss_try!(p_size => SWSSZmqProducerStateTable_dbUpdaterQueueSize(self.ptr, p_size)) } - } -} - -impl Drop for ZmqProducerStateTable { - fn drop(&mut self) { - unsafe { SWSSZmqProducerStateTable_free(self.ptr) }; - } -} - -unsafe impl Send for ZmqProducerStateTable {} - -#[cfg(feature = "async")] -impl ZmqProducerStateTable { - async_util::impl_basic_async_method!( - set_async <= set(&self, key: &str, fvs: I) -> Result<()> - where - I: IntoIterator + Send, - F: AsRef<[u8]>, - V: Into, - ); - async_util::impl_basic_async_method!(del_async <= del(&self, key: &str) -> Result<()>); -} diff --git a/crates/swss-common/src/types/zmqserver.rs b/crates/swss-common/src/types/zmqserver.rs deleted file mode 100644 index a8c7aeb..0000000 --- a/crates/swss-common/src/types/zmqserver.rs +++ /dev/null @@ -1,39 +0,0 @@ -use super::*; -use crate::bindings::*; -use std::sync::Arc; - -/// Rust wrapper around `swss::ZmqServer`. -#[derive(Debug)] -pub struct ZmqServer { - pub(crate) ptr: SWSSZmqServer, - - /// The types that register message handlers with a ZmqServer must be kept alive until - /// the server thread dies, otherwise we risk the server thread calling methods on deleted objects. - /// - /// Currently this is just ZmqConsumerStateTable, but in the future there may be other types added - /// and this vec will need to hold an enum of the possible message handlers. - message_handler_guards: Vec>, -} - -impl ZmqServer { - pub fn new(endpoint: &str) -> Result { - let endpoint = cstr(endpoint); - let ptr = unsafe { swss_try!(p_zs => SWSSZmqServer_new(endpoint.as_ptr(), p_zs))? }; - Ok(Self { - ptr, - message_handler_guards: Vec::new(), - }) - } - - pub(crate) fn register_consumer_state_table(&mut self, tbl_dg: Arc) { - self.message_handler_guards.push(tbl_dg); - } -} - -impl Drop for ZmqServer { - fn drop(&mut self) { - unsafe { SWSSZmqServer_free(self.ptr) }; - } -} - -unsafe impl Send for ZmqServer {} diff --git a/crates/swss-common/tests/async.rs b/crates/swss-common/tests/async.rs deleted file mode 100644 index 1c23d44..0000000 --- a/crates/swss-common/tests/async.rs +++ /dev/null @@ -1,220 +0,0 @@ -#![cfg(feature = "async")] -use paste::paste; -use std::{collections::HashMap, time::Duration}; -use swss_common::*; -use swss_common_testing::*; - -async fn timeout(timeout_ms: u32, fut: F) -> F::Output { - tokio::time::timeout(Duration::from_millis(timeout_ms.into()), fut) - .await - .expect("timed out") -} - -// This macro verifies that async test functions are Send. tokio::test is a bit misleading because -// a tokio runtime's root future can be !Send. This takes a test function and defines two tests from -// it - one that is the root future (Send not required), and one that uses a type assertion to check -// that the tested future is Send. -macro_rules! define_tokio_test_fns { - ($f:ident) => { - paste! { - #[tokio::test] - async fn [< $f _ >]() { - $f().await.unwrap(); - } - - fn [< _assert_ $f _is_send >]() { - fn assert_is_send(_: T) {} - assert_is_send($f()); - } - } - }; -} - -define_tokio_test_fns!(dbconnector_async_api_basic_test); -async fn dbconnector_async_api_basic_test() -> Result<(), Exception> { - let redis = Redis::start(); - let mut db = redis.db_connector(); - - drop(db.clone_timeout_async(10000).await?); - - assert!(db.flush_db_async().await?); - - let random = random_cxx_string(); - - db.set_async("hello", &CxxString::new("hello, world!")).await?; - db.set_async("random", &random).await?; - assert_eq!(db.get_async("hello").await?.unwrap(), "hello, world!"); - assert_eq!(db.get_async("random").await?.unwrap(), random); - assert_eq!(db.get_async("noexist").await?, None); - - assert!(db.exists_async("hello").await?); - assert!(!db.exists_async("noexist").await?); - assert!(db.del_async("hello").await?); - assert!(!db.del_async("hello").await?); - assert!(db.del_async("random").await?); - assert!(!db.del_async("random").await?); - assert!(!db.del_async("noexist").await?); - - db.hset_async("a", "hello", &CxxString::new("hello, world!")).await?; - db.hset_async("a", "random", &random).await?; - assert_eq!(db.hget_async("a", "hello").await?.unwrap(), "hello, world!"); - assert_eq!(db.hget_async("a", "random").await?.unwrap(), random); - assert_eq!(db.hget_async("a", "noexist").await?, None); - assert_eq!(db.hget_async("noexist", "noexist").await?, None); - assert!(db.hexists_async("a", "hello").await?); - assert!(!db.hexists_async("a", "noexist").await?); - assert!(!db.hexists_async("noexist", "hello").await?); - assert!(db.hdel_async("a", "hello").await?); - assert!(!db.hdel_async("a", "hello").await?); - assert!(db.hdel_async("a", "random").await?); - assert!(!db.hdel_async("a", "random").await?); - assert!(!db.hdel_async("a", "noexist").await?); - assert!(!db.hdel_async("noexist", "noexist").await?); - assert!(!db.del_async("a").await?); - - assert!(db.hgetall_async("a").await?.is_empty()); - db.hset_async("a", "a", &CxxString::new("1")).await?; - db.hset_async("a", "b", &CxxString::new("2")).await?; - db.hset_async("a", "c", &CxxString::new("3")).await?; - assert_eq!( - db.hgetall_async("a").await?, - HashMap::from_iter([ - ("a".into(), "1".into()), - ("b".into(), "2".into()), - ("c".into(), "3".into()) - ]) - ); - - assert!(db.flush_db_async().await?); - - Ok(()) -} - -define_tokio_test_fns!(consumer_producer_state_tables_async_api_basic_test); -async fn consumer_producer_state_tables_async_api_basic_test() -> Result<(), Exception> { - let redis = Redis::start(); - let mut pst = ProducerStateTable::new(redis.db_connector(), "table_a")?; - let mut cst = ConsumerStateTable::new(redis.db_connector(), "table_a", None, None)?; - - assert!(cst.pops()?.is_empty()); - - let mut kfvs = random_kfvs(); - for (i, kfv) in kfvs.iter().enumerate() { - assert_eq!(pst.count()?, i as i64); - match kfv.operation { - KeyOperation::Set => pst.set_async(&kfv.key, kfv.field_values.clone()).await?, - KeyOperation::Del => pst.del_async(&kfv.key).await?, - } - } - - timeout(2000, cst.read_data_async()).await.unwrap(); - let mut kfvs_cst = cst.pops()?; - assert!(cst.pops()?.is_empty()); - - kfvs.sort_unstable(); - kfvs_cst.sort_unstable(); - assert_eq!(kfvs_cst.len(), kfvs.len()); - assert_eq!(kfvs_cst, kfvs); - - Ok(()) -} - -define_tokio_test_fns!(subscriber_state_table_async_api_basic_test); -async fn subscriber_state_table_async_api_basic_test() -> Result<(), Exception> { - let redis = Redis::start(); - let mut db = redis.db_connector(); - let mut sst = SubscriberStateTable::new(redis.db_connector(), "table_a", None, None)?; - assert!(sst.pops()?.is_empty()); - - db.hset_async("table_a:key_a", "field_a", &CxxString::new("value_a")) - .await?; - db.hset_async("table_a:key_a", "field_b", &CxxString::new("value_b")) - .await?; - timeout(300, sst.read_data_async()).await.unwrap(); - let mut kfvs = sst.pops()?; - - // SubscriberStateTable will pick up duplicate KeyOpFieldValues' after two SETs on the same - // key. I'm not actually sure if this is intended. - assert_eq!(kfvs.len(), 2); - assert_eq!(kfvs[0], kfvs[1]); - - assert!(sst.pops()?.is_empty()); - - let KeyOpFieldValues { - key, - operation, - field_values, - } = kfvs.pop().unwrap(); - - assert_eq!(key, "key_a"); - assert_eq!(operation, KeyOperation::Set); - assert_eq!( - field_values, - HashMap::from_iter([ - ("field_a".into(), "value_a".into()), - ("field_b".into(), "value_b".into()) - ]) - ); - - Ok(()) -} - -define_tokio_test_fns!(zmq_consumer_producer_state_table_async_api_basic_test); -async fn zmq_consumer_producer_state_table_async_api_basic_test() -> Result<(), Exception> { - let (endpoint, _delete) = random_zmq_endpoint(); - let mut zmqs = ZmqServer::new(&endpoint)?; - let zmqc = ZmqClient::new(&endpoint)?; - - let redis = Redis::start(); - let mut zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?; - let mut zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; - - let kfvs = random_kfvs(); - for kfv in &kfvs { - match kfv.operation { - KeyOperation::Set => zpst.set_async(&kfv.key, kfv.field_values.clone()).await?, - KeyOperation::Del => zpst.del_async(&kfv.key).await?, - } - } - - let mut kfvs_seen = Vec::new(); - while kfvs_seen.len() != kfvs.len() { - timeout(2000, zcst.read_data_async()).await.unwrap(); - kfvs_seen.extend(zcst.pops()?); - } - assert_eq!(kfvs, kfvs_seen); - - Ok(()) -} - -define_tokio_test_fns!(table_async_api_basic_test); -async fn table_async_api_basic_test() -> Result<(), Exception> { - let redis = Redis::start(); - let mut table = Table::new_async(redis.db_connector(), "mytable").await?; - assert!(table.get_keys_async().await?.is_empty()); - assert!(table.get_async("mykey").await?.is_none()); - - let fvs = random_fvs(); - table.set_async("mykey", fvs.clone()).await?; - assert_eq!(table.get_keys_async().await?, &["mykey"]); - assert_eq!(table.get_async("mykey").await?.as_ref(), Some(&fvs)); - - let (field, value) = fvs.iter().next().unwrap(); - assert_eq!(table.hget_async("mykey", field).await?.as_ref(), Some(value)); - table.hdel_async("mykey", field).await?; - assert_eq!(table.hget_async("mykey", field).await?, None); - - table - .hset_async("mykey", field, &CxxString::from("my special value")) - .await?; - assert_eq!( - table.hget_async("mykey", field).await?.unwrap().as_bytes(), - b"my special value" - ); - - table.del_async("mykey").await?; - assert!(table.get_keys_async().await?.is_empty()); - assert!(table.get_async("mykey").await?.is_none()); - - Ok(()) -} diff --git a/crates/swss-common/tests/logger.rs b/crates/swss-common/tests/logger.rs deleted file mode 100644 index b27e1b2..0000000 --- a/crates/swss-common/tests/logger.rs +++ /dev/null @@ -1,49 +0,0 @@ -use lazy_static::lazy_static; -use std::sync::Mutex; -use std::thread::sleep; -use std::time::Duration; -use swss_common::*; -use swss_common_testing::*; - -lazy_static! { - static ref LEVEL: Mutex = Mutex::new("INFO".to_string()); - static ref OUTPUT: Mutex = Mutex::new("INFO".to_string()); -} -struct LoggerConfigHandlerForTest {} - -impl LoggerConfigChangeHandler for LoggerConfigHandlerForTest { - fn on_log_level_change(&mut self, level_str: &str) { - let mut level = LEVEL.lock().unwrap(); - *level = level_str.to_string(); - } - - fn on_log_output_change(&mut self, output_str: &str) { - let mut output = OUTPUT.lock().unwrap(); - *output = output_str.to_string(); - } -} -#[test] -fn logger_init_test() -> Result<(), Exception> { - let redis = Redis::start_config_db(); - // todo: change redis::db_connector to passing db_id as parameter - let db = DbConnector::new_unix(1, &redis.sock, 0)?; - - db.hset("LOGGER|test", "LOGLEVEL", &CxxString::new("NOTICE"))?; - db.hset("LOGGER|test", "LOGOUTPUT", &CxxString::new("SYSLOG"))?; - - // connect to swsscommon logger - let handler = LoggerConfigHandlerForTest {}; - link_to_swsscommon_logger("test", handler)?; - - assert_eq!("NOTICE", *LEVEL.lock().unwrap()); - assert_eq!("SYSLOG", *OUTPUT.lock().unwrap()); - - // test log level change - db.hset("LOGGER|test", "LOGLEVEL", &CxxString::new("CRIT"))?; - db.hset("LOGGER|test", "LOGOUTPUT", &CxxString::new("STDOUT"))?; - sleep(Duration::from_millis(500)); - - assert_eq!("CRIT", *LEVEL.lock().unwrap()); - assert_eq!("STDOUT", *OUTPUT.lock().unwrap()); - Ok(()) -} diff --git a/crates/swss-common/tests/logger_fallback.rs b/crates/swss-common/tests/logger_fallback.rs deleted file mode 100644 index 5e3a761..0000000 --- a/crates/swss-common/tests/logger_fallback.rs +++ /dev/null @@ -1,22 +0,0 @@ -use swss_common::*; - -struct LoggerConfigHandlerForTest {} - -impl LoggerConfigChangeHandler for LoggerConfigHandlerForTest { - #[allow(unused_variables)] - fn on_log_level_change(&mut self, level_str: &str) {} - - #[allow(unused_variables)] - fn on_log_output_change(&mut self, output_str: &str) {} -} - -// Test link_to_swsscommon_logger returning an error when Redis is not available. -// This test has to be run in a separate test suite because it can't run with other tests that use Redis. -#[test] -fn logger_init_without_redis() { - let handler = LoggerConfigHandlerForTest {}; - // connect to swsscommon logger - let result = link_to_swsscommon_logger("test", handler); - - assert!(result.is_err()); -} diff --git a/crates/swss-common/tests/sync.rs b/crates/swss-common/tests/sync.rs deleted file mode 100644 index a4f4081..0000000 --- a/crates/swss-common/tests/sync.rs +++ /dev/null @@ -1,264 +0,0 @@ -use std::{collections::HashMap, time::Duration}; -use swss_common::*; -use swss_common_testing::*; - -#[test] -fn dbconnector_sync_api_basic_test() -> Result<(), Exception> { - let redis = Redis::start(); - let db = redis.db_connector(); - - drop(db.clone_timeout(10000)); - - assert!(db.flush_db()?); - - let random = random_cxx_string(); - - db.set("hello", &CxxString::new("hello, world!"))?; - db.set("random", &random)?; - assert_eq!(db.get("hello")?.unwrap(), "hello, world!"); - assert_eq!(db.get("random")?.unwrap(), random); - assert_eq!(db.get("noexist")?, None); - - assert!(db.exists("hello")?); - assert!(!db.exists("noexist")?); - assert!(db.del("hello")?); - assert!(!db.del("hello")?); - assert!(db.del("random")?); - assert!(!db.del("random")?); - assert!(!db.del("noexist")?); - - db.hset("a", "hello", &CxxString::new("hello, world!"))?; - db.hset("a", "random", &random)?; - assert_eq!(db.hget("a", "hello")?.unwrap(), "hello, world!"); - assert_eq!(db.hget("a", "random")?.unwrap(), random); - assert_eq!(db.hget("a", "noexist")?, None); - assert_eq!(db.hget("noexist", "noexist")?, None); - assert!(db.hexists("a", "hello")?); - assert!(!db.hexists("a", "noexist")?); - assert!(!db.hexists("noexist", "hello")?); - assert!(db.hdel("a", "hello")?); - assert!(!db.hdel("a", "hello")?); - assert!(db.hdel("a", "random")?); - assert!(!db.hdel("a", "random")?); - assert!(!db.hdel("a", "noexist")?); - assert!(!db.hdel("noexist", "noexist")?); - assert!(!db.del("a")?); - - assert!(db.hgetall("a")?.is_empty()); - db.hset("a", "a", &CxxString::new("1"))?; - db.hset("a", "b", &CxxString::new("2"))?; - db.hset("a", "c", &CxxString::new("3"))?; - assert_eq!( - db.hgetall("a")?, - HashMap::from_iter([ - ("a".into(), "1".into()), - ("b".into(), "2".into()), - ("c".into(), "3".into()) - ]) - ); - - assert!(db.flush_db()?); - - Ok(()) -} - -#[test] -fn consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Exception> { - sonic_db_config_init_for_test(); - let redis = Redis::start(); - let pst = ProducerStateTable::new(redis.db_connector(), "table_a")?; - let cst = ConsumerStateTable::new(redis.db_connector(), "table_a", None, None)?; - - assert!(cst.pops()?.is_empty()); - - let mut kfvs = random_kfvs(); - for (i, kfv) in kfvs.iter().enumerate() { - assert_eq!(pst.count()?, i as i64); - match kfv.operation { - KeyOperation::Set => pst.set(&kfv.key, kfv.field_values.clone())?, - KeyOperation::Del => pst.del(&kfv.key)?, - } - } - - assert_eq!(cst.read_data(Duration::from_millis(2000), true)?, SelectResult::Data); - let mut kfvs_cst = cst.pops()?; - assert!(cst.pops()?.is_empty()); - - kfvs.sort_unstable(); - kfvs_cst.sort_unstable(); - assert_eq!(kfvs_cst.len(), kfvs.len()); - assert_eq!(kfvs_cst, kfvs); - - Ok(()) -} - -#[test] -fn subscriber_state_table_sync_api_basic_test() -> Result<(), Exception> { - sonic_db_config_init_for_test(); - let redis = Redis::start(); - let db = redis.db_connector(); - let sst = SubscriberStateTable::new(redis.db_connector(), "table_a", None, None)?; - assert!(sst.pops()?.is_empty()); - - db.hset("table_a:key_a", "field_a", &CxxString::new("value_a"))?; - db.hset("table_a:key_a", "field_b", &CxxString::new("value_b"))?; - assert_eq!(sst.read_data(Duration::from_millis(300), true)?, SelectResult::Data); - let mut kfvs = sst.pops()?; - - // SubscriberStateTable will pick up duplicate KeyOpFieldValues' after two SETs on the same - // key. I'm not actually sure if this is intended. - assert_eq!(kfvs.len(), 2); - assert_eq!(kfvs[0], kfvs[1]); - - assert!(sst.pops()?.is_empty()); - - let KeyOpFieldValues { - key, - operation, - field_values, - } = kfvs.pop().unwrap(); - - assert_eq!(key, "key_a"); - assert_eq!(operation, KeyOperation::Set); - assert_eq!( - field_values, - HashMap::from_iter([ - ("field_a".into(), "value_a".into()), - ("field_b".into(), "value_b".into()) - ]) - ); - - Ok(()) -} - -#[test] -fn zmq_consumer_state_table_sync_api_basic_test() -> Result<(), Exception> { - use SelectResult::*; - - let (endpoint, _delete) = random_zmq_endpoint(); - let mut zmqs = ZmqServer::new(&endpoint)?; - let zmqc = ZmqClient::new(&endpoint)?; - assert!(zmqc.is_connected()?); - - let redis = Redis::start(); - let zcst_table_a = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; - let zcst_table_b = ZmqConsumerStateTable::new(redis.db_connector(), "table_b", &mut zmqs, None, None)?; - - let kfvs = random_kfvs(); - - zmqc.send_msg("", "table_a", kfvs.clone())?; // db name is empty because we are using DbConnector::new_unix - assert_eq!(zcst_table_a.read_data(Duration::from_millis(1500), true)?, Data); - - zmqc.send_msg("", "table_b", kfvs.clone())?; - assert_eq!(zcst_table_b.read_data(Duration::from_millis(1500), true)?, Data); - - let kfvs_a = zcst_table_a.pops()?; - let kvfs_b = zcst_table_b.pops()?; - assert_eq!(kfvs_a, kvfs_b); - assert_eq!(kfvs, kfvs_a); - - Ok(()) -} - -#[test] -fn zmq_consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Exception> { - use SelectResult::*; - - let (endpoint, _delete) = random_zmq_endpoint(); - let mut zmqs = ZmqServer::new(&endpoint)?; - let zmqc = ZmqClient::new(&endpoint)?; - - let redis = Redis::start(); - let zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?; - let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; - - let kfvs = random_kfvs(); - for kfv in &kfvs { - match kfv.operation { - KeyOperation::Set => zpst.set(&kfv.key, kfv.field_values.clone())?, - KeyOperation::Del => zpst.del(&kfv.key)?, - } - } - - let mut kfvs_seen = Vec::new(); - while kfvs_seen.len() != kfvs.len() { - assert_eq!(zcst.read_data(Duration::from_millis(2000), true)?, Data); - kfvs_seen.extend(zcst.pops()?); - } - assert_eq!(kfvs, kfvs_seen); - - Ok(()) -} - -// Below test covers 2 scenarios: -// 1. late connect when zmq server is started after client sending messages. -// 2. reconnect when zmq server is stopped and restarted. messages from client during -// the time should be queued by client and resent when server is restarted. -#[test] -fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Result<(), Exception> { - use SelectResult::*; - enum TestPhase { - LateConnect, - Reconnect, - } - let (endpoint, _delete) = random_zmq_endpoint(); - - let zmqc = ZmqClient::new(&endpoint)?; - let redis = Redis::start(); - let zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?; - - for _ in [TestPhase::LateConnect, TestPhase::Reconnect] { - let kfvs = random_kfvs(); - for kfv in &kfvs { - match kfv.operation { - KeyOperation::Set => zpst.set(&kfv.key, kfv.field_values.clone())?, - KeyOperation::Del => zpst.del(&kfv.key)?, - } - } - - let mut zmqs = ZmqServer::new(&endpoint)?; - let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; - let mut kfvs_seen = Vec::new(); - while kfvs_seen.len() != kfvs.len() { - assert_eq!(zcst.read_data(Duration::from_millis(2000), true)?, Data); - kfvs_seen.extend(zcst.pops()?); - } - assert_eq!(kfvs, kfvs_seen); - drop(zcst); - drop(zmqs); - } - - Ok(()) -} - -#[test] -fn table_sync_api_basic_test() -> Result<(), Exception> { - let redis = Redis::start(); - let table = Table::new(redis.db_connector(), "mytable")?; - assert!(table.get_keys()?.is_empty()); - assert!(table.get("mykey")?.is_none()); - - let fvs = random_fvs(); - table.set("mykey", fvs.clone())?; - assert_eq!(table.get_keys()?, &["mykey"]); - assert_eq!(table.get("mykey")?.as_ref(), Some(&fvs)); - - let (field, value) = fvs.iter().next().unwrap(); - assert_eq!(table.hget("mykey", field)?.as_ref(), Some(value)); - table.hdel("mykey", field)?; - assert_eq!(table.hget("mykey", field)?, None); - - table.hset("mykey", field, &CxxString::from("my special value"))?; - assert_eq!(table.hget("mykey", field)?.unwrap().as_bytes(), b"my special value"); - - table.del("mykey")?; - assert!(table.get_keys()?.is_empty()); - assert!(table.get("mykey")?.is_none()); - - Ok(()) -} - -#[test] -fn expected_exceptions() { - DbConnector::new_tcp(0, "127.0.0.1", 1, 10000).unwrap_err(); -} diff --git a/crates/swss-serde/Cargo.toml b/crates/swss-serde/Cargo.toml index aa88af9..8da1e03 100644 --- a/crates/swss-serde/Cargo.toml +++ b/crates/swss-serde/Cargo.toml @@ -10,11 +10,11 @@ edition.workspace = true [dependencies] serde.workspace = true -swss-common = { path = "../swss-common" } +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } serde_serializer_quick_unsupported = "0.1" [lints] workspace = true [dev-dependencies] -swss-common-testing = { path = "../swss-common-testing" } +swss-common-testing = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" }