diff --git a/crates/swss-common/tests/sync.rs b/crates/swss-common/tests/sync.rs index f366c7f..a4f4081 100644 --- a/crates/swss-common/tests/sync.rs +++ b/crates/swss-common/tests/sync.rs @@ -190,6 +190,47 @@ fn zmq_consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Except Ok(()) } +// Below test covers 2 scenarios: +// 1. late connect when zmq server is started after client sending messages. +// 2. reconnect when zmq server is stopped and restarted. messages from client during +// the time should be queued by client and resent when server is restarted. +#[test] +fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Result<(), Exception> { + use SelectResult::*; + enum TestPhase { + LateConnect, + Reconnect, + } + let (endpoint, _delete) = random_zmq_endpoint(); + + let zmqc = ZmqClient::new(&endpoint)?; + let redis = Redis::start(); + let zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?; + + for _ in [TestPhase::LateConnect, TestPhase::Reconnect] { + let kfvs = random_kfvs(); + for kfv in &kfvs { + match kfv.operation { + KeyOperation::Set => zpst.set(&kfv.key, kfv.field_values.clone())?, + KeyOperation::Del => zpst.del(&kfv.key)?, + } + } + + let mut zmqs = ZmqServer::new(&endpoint)?; + let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; + let mut kfvs_seen = Vec::new(); + while kfvs_seen.len() != kfvs.len() { + assert_eq!(zcst.read_data(Duration::from_millis(2000), true)?, Data); + kfvs_seen.extend(zcst.pops()?); + } + assert_eq!(kfvs, kfvs_seen); + drop(zcst); + drop(zmqs); + } + + Ok(()) +} + #[test] fn table_sync_api_basic_test() -> Result<(), Exception> { let redis = Redis::start();