From fd6b76e08e5d60669c8ed483c30b4fe023eec455 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 18 Nov 2022 14:36:30 +0800 Subject: [PATCH 01/13] f Signed-off-by: CalvinNeo --- new-mock-engine-store/src/mock_cluster.rs | 3 + new-mock-engine-store/src/node.rs | 2 + proxy_scripts/ci_check.sh | 2 +- proxy_tests/proxy/config.rs | 6 +- proxy_tests/proxy/mod.rs | 1 + proxy_tests/proxy/normal.rs | 143 ---------------------- proxy_tests/proxy/proxy.rs | 6 +- proxy_tests/proxy/snapshot.rs | 4 + 8 files changed, 16 insertions(+), 151 deletions(-) diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 5be1f40759c..191fe046d87 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -112,6 +112,8 @@ pub struct Cluster> { pub test_data: TestData, } +impl> std::panic::UnwindSafe for Cluster {} + impl> Cluster { pub fn new( id: u64, @@ -294,6 +296,7 @@ impl> Cluster { self.ffi_helper_lst.push(ffi_helper_set); } + // If None, use the last in the list, which is added by create_ffi_helper_set. pub fn associate_ffi_helper_set(&mut self, index: Option, node_id: u64) { let mut ffi_helper_set = if let Some(i) = index { self.ffi_helper_lst.remove(i) diff --git a/new-mock-engine-store/src/node.rs b/new-mock-engine-store/src/node.rs index a383046f51d..ced1f343692 100644 --- a/new-mock-engine-store/src/node.rs +++ b/new-mock-engine-store/src/node.rs @@ -176,6 +176,8 @@ pub struct NodeCluster { pub importer: Option>, } +impl std::panic::UnwindSafe for NodeCluster {} + impl NodeCluster { pub fn new(pd_client: Arc) -> NodeCluster { NodeCluster { diff --git a/proxy_scripts/ci_check.sh b/proxy_scripts/ci_check.sh index 53716d15036..2635f1d9a85 100755 --- a/proxy_scripts/ci_check.sh +++ b/proxy_scripts/ci_check.sh @@ -33,7 +33,6 @@ elif [[ $M == "testnew" ]]; then cargo check --package proxy_server --features="$ENABLE_FEATURES" # tests based on new-mock-engine-store, with compat for new proxy cargo test --package proxy_tests --test proxy normal::store - cargo test --package proxy_tests --test proxy normal::region cargo test --package proxy_tests --test proxy normal::config cargo test --package proxy_tests --test proxy normal::ingest cargo test --package proxy_tests --test proxy normal::restart @@ -41,6 +40,7 @@ elif [[ $M == "testnew" ]]; then cargo test --package proxy_tests --test proxy write cargo test --package proxy_tests --test proxy snapshot cargo test --package proxy_tests --test proxy config + cargo test --package proxy_tests --test proxy region cargo test --package proxy_tests --test proxy flashback cargo test --package proxy_tests --test proxy server_cluster_test elif [[ $M == "debug" ]]; then diff --git a/proxy_tests/proxy/config.rs b/proxy_tests/proxy/config.rs index 9f8fecdec25..71a1f051103 100644 --- a/proxy_tests/proxy/config.rs +++ b/proxy_tests/proxy/config.rs @@ -2,10 +2,8 @@ use clap::{App, Arg}; use proxy_server::{ config::{ - address_proxy_config, ensure_no_common_unrecognized_keys, get_last_config, - memory_limit_for_cf, setup_default_tikv_config, validate_and_persist_config, - TIFLASH_DEFAULT_ADVERTISE_LISTENING_ADDR, TIFLASH_DEFAULT_LISTENING_ADDR, - TIFLASH_DEFAULT_STATUS_ADDR, + address_proxy_config, memory_limit_for_cf, TIFLASH_DEFAULT_ADVERTISE_LISTENING_ADDR, + TIFLASH_DEFAULT_LISTENING_ADDR, TIFLASH_DEFAULT_STATUS_ADDR, }, proxy::{gen_proxy_config, gen_tikv_config}, setup::overwrite_config_with_cmd_args, diff --git a/proxy_tests/proxy/mod.rs b/proxy_tests/proxy/mod.rs index bca7d49ed37..c2d2336999d 100644 --- a/proxy_tests/proxy/mod.rs +++ b/proxy_tests/proxy/mod.rs @@ -11,6 +11,7 @@ mod config; mod flashback; mod normal; mod proxy; +mod region; mod server_cluster_test; mod snapshot; mod util; diff --git a/proxy_tests/proxy/normal.rs b/proxy_tests/proxy/normal.rs index 197f74e6be9..9f1574f1a38 100644 --- a/proxy_tests/proxy/normal.rs +++ b/proxy_tests/proxy/normal.rs @@ -41,149 +41,6 @@ mod store { } } -mod region { - use super::*; - - #[test] - fn test_handle_destroy() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - - disable_auto_gen_compact_log(&mut cluster); - - // Disable default max peer count check. - pd_client.disable_default_operator(); - - cluster.run(); - cluster.must_put(b"k1", b"v1"); - let eng_ids = cluster - .engines - .iter() - .map(|e| e.0.to_owned()) - .collect::>(); - - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); - let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); - let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); - cluster.must_transfer_leader(region_id, peer_1); - - iter_ffi_helpers( - &cluster, - Some(vec![eng_ids[1]]), - &mut |_, _, ffi: &mut FFIHelperSet| { - let server = &ffi.engine_store_server; - assert!(server.kvstore.contains_key(®ion_id)); - }, - ); - - pd_client.must_remove_peer(region_id, peer_2); - - check_key( - &cluster, - b"k1", - b"v2", - Some(false), - None, - Some(vec![eng_ids[1]]), - ); - - std::thread::sleep(std::time::Duration::from_millis(100)); - // Region removed in server. - iter_ffi_helpers( - &cluster, - Some(vec![eng_ids[1]]), - &mut |_, _, ffi: &mut FFIHelperSet| { - let server = &ffi.engine_store_server; - assert!(!server.kvstore.contains_key(®ion_id)); - }, - ); - - cluster.shutdown(); - } - - #[test] - fn test_get_region_local_state() { - let (mut cluster, _pd_client) = new_mock_cluster(0, 3); - - cluster.run(); - - let k = b"k1"; - let v = b"v1"; - cluster.must_put(k, v); - check_key(&cluster, k, v, Some(true), None, None); - let region_id = cluster.get_region(k).get_id(); - - // Get RegionLocalState through ffi - unsafe { - iter_ffi_helpers( - &cluster, - None, - &mut |_id: u64, _, ffi_set: &mut FFIHelperSet| { - let f = ffi_set.proxy_helper.fn_get_region_local_state.unwrap(); - let mut state = kvproto::raft_serverpb::RegionLocalState::default(); - let mut error_msg = new_mock_engine_store::RawCppStringPtrGuard::default(); - - assert_eq!( - f( - ffi_set.proxy_helper.proxy_ptr, - region_id, - &mut state as *mut _ as _, - error_msg.as_mut(), - ), - KVGetStatus::Ok - ); - assert!(state.has_region()); - assert_eq!(state.get_state(), kvproto::raft_serverpb::PeerState::Normal); - assert!(error_msg.as_ref().is_null()); - - let mut state = kvproto::raft_serverpb::RegionLocalState::default(); - assert_eq!( - f( - ffi_set.proxy_helper.proxy_ptr, - 0, // not exist - &mut state as *mut _ as _, - error_msg.as_mut(), - ), - KVGetStatus::NotFound - ); - assert!(!state.has_region()); - assert!(error_msg.as_ref().is_null()); - - ffi_set - .proxy - .get_value_cf("none_cf", "123".as_bytes(), |value| { - let msg = value.unwrap_err(); - assert_eq!(msg, "Storage Engine Status { code: IoError, sub_code: None, sev: NoError, state: \"cf none_cf not found\" }"); - }); - ffi_set - .proxy - .get_value_cf("raft", "123".as_bytes(), |value| { - let res = value.unwrap(); - assert!(res.is_none()); - }); - - // If we have no kv engine. - ffi_set.proxy.set_kv_engine(None); - let res = ffi_set.proxy_helper.fn_get_region_local_state.unwrap()( - ffi_set.proxy_helper.proxy_ptr, - region_id, - &mut state as *mut _ as _, - error_msg.as_mut(), - ); - assert_eq!(res, KVGetStatus::Error); - assert!(!error_msg.as_ref().is_null()); - assert_eq!( - error_msg.as_str(), - "KV engine is not initialized".as_bytes() - ); - }, - ); - } - - cluster.shutdown(); - } -} - mod config { use super::*; diff --git a/proxy_tests/proxy/proxy.rs b/proxy_tests/proxy/proxy.rs index 70ccda23348..4c0fdf7360c 100644 --- a/proxy_tests/proxy/proxy.rs +++ b/proxy_tests/proxy/proxy.rs @@ -32,7 +32,7 @@ pub use new_mock_engine_store::{ }; pub use raft::eraftpb::MessageType; pub use raftstore::coprocessor::ConsistencyCheckMethod; -pub use test_raftstore::new_peer; +pub use test_raftstore::{new_learner_peer, new_peer}; pub use tikv_util::{ config::{ReadableDuration, ReadableSize}, store::find_peer, @@ -113,12 +113,12 @@ pub fn maybe_collect_states( let mut prev_state: HashMap = HashMap::default(); iter_ffi_helpers( cluster, - None, + store_ids, &mut |id: u64, engine: &engine_rocks::RocksEngine, ffi: &mut FFIHelperSet| { let server = &ffi.engine_store_server; if let Some(region) = server.kvstore.get(®ion_id) { let ident = match engine.get_msg::(keys::STORE_IDENT_KEY) { - Ok(Some(i)) => (i), + Ok(Some(i)) => i, _ => unreachable!(), }; prev_state.insert( diff --git a/proxy_tests/proxy/snapshot.rs b/proxy_tests/proxy/snapshot.rs index e9e6290e1c5..69211e2bdfd 100644 --- a/proxy_tests/proxy/snapshot.rs +++ b/proxy_tests/proxy/snapshot.rs @@ -1,6 +1,10 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use crate::proxy::*; +// This is a panic while panic test, which we can not handle. +// This double panic is due to: +// 1. check_applying_snap after apply_snap. +// 2. Drop in PeerFsm which leads to check_applying_snap. // #[test] #[should_panic] fn test_delete_snapshot_after_apply() { From d60691c08bd675cb955e239e42f419a806334562 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 18 Nov 2022 18:05:01 +0800 Subject: [PATCH 02/13] f Signed-off-by: CalvinNeo --- Cargo.toml | 2 +- proxy_tests/proxy/proxy.rs | 2 +- proxy_tests/proxy/region.rs | 223 ++++++++++++++++++++++++++++++++++++ 3 files changed, 225 insertions(+), 2 deletions(-) create mode 100644 proxy_tests/proxy/region.rs diff --git a/Cargo.toml b/Cargo.toml index bee83a3842d..bff99190c58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -417,7 +417,7 @@ rpath = false [profile.test] opt-level = 0 -debug = 0 +debug = true codegen-units = 16 lto = false incremental = true diff --git a/proxy_tests/proxy/proxy.rs b/proxy_tests/proxy/proxy.rs index 4c0fdf7360c..c194684bc1e 100644 --- a/proxy_tests/proxy/proxy.rs +++ b/proxy_tests/proxy/proxy.rs @@ -30,7 +30,7 @@ pub use new_mock_engine_store::{ }, Cluster, ProxyConfig, Simulator, TestPdClient, }; -pub use raft::eraftpb::MessageType; +pub use raft::eraftpb::{ConfChangeType, MessageType}; pub use raftstore::coprocessor::ConsistencyCheckMethod; pub use test_raftstore::{new_learner_peer, new_peer}; pub use tikv_util::{ diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs new file mode 100644 index 00000000000..ffbbe3998bf --- /dev/null +++ b/proxy_tests/proxy/region.rs @@ -0,0 +1,223 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use crate::proxy::*; + +#[test] +fn test_handle_destroy() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + disable_auto_gen_compact_log(&mut cluster); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + + cluster.run(); + cluster.must_put(b"k1", b"v1"); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); + let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); + cluster.must_transfer_leader(region_id, peer_1); + + iter_ffi_helpers( + &cluster, + Some(vec![eng_ids[1]]), + &mut |_, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + assert!(server.kvstore.contains_key(®ion_id)); + }, + ); + + pd_client.must_remove_peer(region_id, peer_2); + + check_key( + &cluster, + b"k1", + b"v2", + Some(false), + None, + Some(vec![eng_ids[1]]), + ); + + std::thread::sleep(std::time::Duration::from_millis(100)); + // Region removed in server. + iter_ffi_helpers( + &cluster, + Some(vec![eng_ids[1]]), + &mut |_, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + assert!(!server.kvstore.contains_key(®ion_id)); + }, + ); + + cluster.shutdown(); +} + +#[test] +fn test_get_region_local_state() { + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); + + cluster.run(); + + let k = b"k1"; + let v = b"v1"; + cluster.must_put(k, v); + check_key(&cluster, k, v, Some(true), None, None); + let region_id = cluster.get_region(k).get_id(); + + // Get RegionLocalState through ffi + unsafe { + iter_ffi_helpers( + &cluster, + None, + &mut |_id: u64, _, ffi_set: &mut FFIHelperSet| { + let f = ffi_set.proxy_helper.fn_get_region_local_state.unwrap(); + let mut state = kvproto::raft_serverpb::RegionLocalState::default(); + let mut error_msg = new_mock_engine_store::RawCppStringPtrGuard::default(); + + assert_eq!( + f( + ffi_set.proxy_helper.proxy_ptr, + region_id, + &mut state as *mut _ as _, + error_msg.as_mut(), + ), + KVGetStatus::Ok + ); + assert!(state.has_region()); + assert_eq!(state.get_state(), kvproto::raft_serverpb::PeerState::Normal); + assert!(error_msg.as_ref().is_null()); + + let mut state = kvproto::raft_serverpb::RegionLocalState::default(); + assert_eq!( + f( + ffi_set.proxy_helper.proxy_ptr, + 0, // not exist + &mut state as *mut _ as _, + error_msg.as_mut(), + ), + KVGetStatus::NotFound + ); + assert!(!state.has_region()); + assert!(error_msg.as_ref().is_null()); + + ffi_set + .proxy + .get_value_cf("none_cf", "123".as_bytes(), |value| { + let msg = value.unwrap_err(); + assert_eq!(msg, "Storage Engine Status { code: IoError, sub_code: None, sev: NoError, state: \"cf none_cf not found\" }"); + }); + ffi_set + .proxy + .get_value_cf("raft", "123".as_bytes(), |value| { + let res = value.unwrap(); + assert!(res.is_none()); + }); + + // If we have no kv engine. + ffi_set.proxy.set_kv_engine(None); + let res = ffi_set.proxy_helper.fn_get_region_local_state.unwrap()( + ffi_set.proxy_helper.proxy_ptr, + region_id, + &mut state as *mut _ as _, + error_msg.as_mut(), + ); + assert_eq!(res, KVGetStatus::Error); + assert!(!error_msg.as_ref().is_null()); + assert_eq!( + error_msg.as_str(), + "KV engine is not initialized".as_bytes() + ); + }, + ); + } + + cluster.shutdown(); +} + +/// This test is very important. +/// If make sure we can add learner peer for a store which is not started +/// actually. +#[test] +fn test_add_learner_peer_before_start_by_simple() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + disable_auto_gen_compact_log(&mut cluster); + // Disable default max peer count check. + pd_client.disable_default_operator(); + + let _ = cluster.run(); + cluster.must_put(b"k1", b"v1"); + check_key(&cluster, b"k1", b"v1", Some(true), None, Some(vec![1])); + + pd_client.must_add_peer(1, new_learner_peer(4, 4)); + + cluster.must_put(b"k3", b"v3"); + check_key(&cluster, b"k3", b"v3", Some(true), None, None); + let new_states = collect_all_states(&cluster, 1); + assert_eq!(new_states.len(), 3); + for i in new_states.keys() { + assert_eq!( + new_states + .get(i) + .unwrap() + .in_disk_region_state + .get_region() + .get_peers() + .len(), + 3 + 1 // Learner + ); + } + + cluster.shutdown(); +} + +/// This test is very important. +/// If make sure we can add learner peer for a store which is not started +/// actually. +#[test] +fn test_add_learner_peer_before_start_by_joint() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + disable_auto_gen_compact_log(&mut cluster); + // Disable default max peer count check. + pd_client.disable_default_operator(); + + let _ = cluster.run_conf_change(); + cluster.must_put(b"k1", b"v1"); + check_key(&cluster, b"k1", b"v1", Some(true), None, Some(vec![1])); + + pd_client.must_joint_confchange( + 1, + vec![ + (ConfChangeType::AddNode, new_peer(2, 2)), + (ConfChangeType::AddNode, new_peer(3, 3)), + (ConfChangeType::AddLearnerNode, new_learner_peer(4, 4)), + (ConfChangeType::AddLearnerNode, new_learner_peer(5, 5)), + ], + ); + assert!(pd_client.is_in_joint(1)); + pd_client.must_leave_joint(1); + + cluster.must_put(b"k3", b"v3"); + check_key(&cluster, b"k3", b"v3", Some(true), None, None); + let new_states = collect_all_states(&cluster, 1); + assert_eq!(new_states.len(), 3); + for i in new_states.keys() { + assert_eq!( + new_states + .get(i) + .unwrap() + .in_disk_region_state + .get_region() + .get_peers() + .len(), + 1 + 2 /* AddPeer */ + 2 // Learner + ); + } + + cluster.shutdown(); +} From 4db05be7fa1b02abf3dd0933caba2b8c28008243 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Sat, 19 Nov 2022 00:43:22 +0800 Subject: [PATCH 03/13] tmp test Signed-off-by: CalvinNeo --- new-mock-engine-store/src/lib.rs | 2 + new-mock-engine-store/src/mock_cluster.rs | 31 +++++++- proxy_tests/proxy/region.rs | 90 ++++++++++++++++++++++- 3 files changed, 119 insertions(+), 4 deletions(-) diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index 74bafa3eaec..c28957ec606 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -1016,6 +1016,8 @@ unsafe extern "C" fn ffi_pre_handle_snapshot( "pre handle snaps"; "peer_id" => peer_id, "store_id" => node_id, + "index" => index, + "term" => term, "region" => ?region.region, "snap len" => snaps.len, ); diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 191fe046d87..85fcbc86615 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -92,7 +92,7 @@ pub struct TestData { pub struct Cluster> { // Helper to set ffi_helper_set. - ffi_helper_lst: Vec, + pub ffi_helper_lst: Vec, pub ffi_helper_set: Arc>>, pub cfg: Config, @@ -258,6 +258,12 @@ impl> Cluster { region_id } + pub fn run_conf_change_no_start(&mut self) -> u64 { + self.create_engines(); + let region_id = self.bootstrap_conf_change(); + region_id + } + pub fn create_ffi_helper_set( &mut self, engines: Engines, @@ -328,11 +334,24 @@ impl> Cluster { } pub fn start(&mut self) -> ServerResult<()> { + self.start_with(Default::default()) + } + + pub fn start_with(&mut self, skip_set: HashSet) -> ServerResult<()> { init_global_ffi_helper_set(); // Try recover from last shutdown. - let node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect(); + let mut node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect(); + node_ids.sort(); + let mut cnt: usize = 0; for node_id in node_ids { + if skip_set.contains(&cnt) { + tikv_util::info!("skip start at {} is {}", cnt, node_id); + cnt += 1; + continue; + } else { + cnt += 1; + } debug!("recover node"; "node_id" => node_id); let _engines = self.engines.get_mut(&node_id).unwrap().clone(); let _key_mgr = self.key_managers_map[&node_id].clone(); @@ -352,7 +371,13 @@ impl> Cluster { } // Try start new nodes. + // Normally, this branch will not be called, since self.engines are already + // added in bootstrap_region o bootstrap_conf_change. + cnt = 0; for _ in 0..self.count - self.engines.len() { + if !skip_set.empty() { + panic!("Error when start with skip set"); + } let (router, system) = create_raft_batch_system(&self.cfg.raft_store); self.create_engine(Some(router.clone())); @@ -383,6 +408,8 @@ impl> Cluster { self.key_managers_map.insert(node_id, key_manager.clone()); self.associate_ffi_helper_set(None, node_id); } + assert_eq!(self.count, self.engines.len()); + assert_eq!(self.count, self.dbs.len()); Ok(()) } diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index ffbbe3998bf..1606fbc937e 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -1,3 +1,7 @@ +use std::iter::FromIterator; + +use collections::HashSet; + // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use crate::proxy::*; @@ -144,7 +148,7 @@ fn test_get_region_local_state() { /// If make sure we can add learner peer for a store which is not started /// actually. #[test] -fn test_add_learner_peer_before_start_by_simple() { +fn test_add_invalid_learner_peer_by_simple() { let (mut cluster, pd_client) = new_mock_cluster(0, 3); disable_auto_gen_compact_log(&mut cluster); // Disable default max peer count check. @@ -180,7 +184,7 @@ fn test_add_learner_peer_before_start_by_simple() { /// If make sure we can add learner peer for a store which is not started /// actually. #[test] -fn test_add_learner_peer_before_start_by_joint() { +fn test_add_invalid_learner_peer_by_joint() { let (mut cluster, pd_client) = new_mock_cluster(0, 3); disable_auto_gen_compact_log(&mut cluster); // Disable default max peer count check. @@ -221,3 +225,85 @@ fn test_add_learner_peer_before_start_by_joint() { cluster.shutdown(); } + +/// This test is very important. +/// If make sure we can add learner peer for a store which is not started +/// actually. +#[test] +fn test_add_learner_peer_before_start_by_joint() { + let (mut cluster, pd_client) = new_mock_cluster(0, 5); + fail::cfg("on_pre_persist_with_finish", "return").unwrap(); + cluster.cfg.proxy_compat = false; + disable_auto_gen_compact_log(&mut cluster); + // Disable default max peer count check. + pd_client.disable_default_operator(); + + let _ = cluster.run_conf_change_no_start(); + let _ = cluster.start_with(HashSet::from_iter( + vec![3, 4].into_iter().map(|x| x as usize), + )); + cluster.must_put(b"k1", b"v1"); + check_key(&cluster, b"k1", b"v1", Some(true), None, Some(vec![1])); + + pd_client.must_joint_confchange( + 1, + vec![ + (ConfChangeType::AddNode, new_peer(2, 2)), + (ConfChangeType::AddNode, new_peer(3, 3)), + (ConfChangeType::AddLearnerNode, new_learner_peer(4, 4)), + (ConfChangeType::AddLearnerNode, new_learner_peer(5, 5)), + ], + ); + assert!(pd_client.is_in_joint(1)); + pd_client.must_leave_joint(1); + + cluster.must_put(b"k3", b"v3"); + check_key( + &cluster, + b"k3", + b"v3", + Some(true), + None, + Some(vec![1, 2, 3]), + ); + // let new_states = maybe_collect_states(&cluster, 1, Some(vec![1,2,3])); + // assert_eq!(new_states.len(), 3); + // for i in new_states.keys() { + // assert_eq!( + // new_states + // .get(i) + // .unwrap() + // .in_disk_region_state + // .get_region() + // .get_peers() + // .len(), + // 1 + 2 /* AddPeer */ + 2 // Learner + // ); + // } + + assert_eq!(cluster.ffi_helper_lst.len(), 2); + cluster + .start_with(HashSet::from_iter( + vec![0, 1, 2].into_iter().map(|x| x as usize), + )) + .unwrap(); + + pd_client.must_joint_confchange( + 1, + vec![ + (ConfChangeType::AddNode, new_peer(2, 2)), + (ConfChangeType::AddNode, new_peer(3, 3)), + (ConfChangeType::AddLearnerNode, new_learner_peer(4, 4)), + (ConfChangeType::AddLearnerNode, new_learner_peer(5, 5)), + ], + ); + // assert!(pd_client.is_in_joint(1)); + pd_client.must_leave_joint(1); + + cluster.must_put(b"k4", b"v4"); + // std::thread::sleep(std::time::Duration::from_millis(3000)); + check_key(&cluster, b"k4", b"v4", Some(true), None, None); + + fail::remove("on_pre_persist_with_finish"); + cluster.shutdown(); +} From 4983de1d4e4281ad3cb4bf89235a145693bae99d Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 21 Nov 2022 10:02:15 +0800 Subject: [PATCH 04/13] f Signed-off-by: CalvinNeo --- new-mock-engine-store/src/mock_cluster.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 85fcbc86615..5a3de859518 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -375,7 +375,7 @@ impl> Cluster { // added in bootstrap_region o bootstrap_conf_change. cnt = 0; for _ in 0..self.count - self.engines.len() { - if !skip_set.empty() { + if !skip_set.is_empty() { panic!("Error when start with skip set"); } let (router, system) = create_raft_batch_system(&self.cfg.raft_store); From 64a03034d58ec78d039b16a24f728a90ebec45b8 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 21 Nov 2022 16:58:23 +0800 Subject: [PATCH 05/13] polish Signed-off-by: CalvinNeo --- new-mock-engine-store/src/mock_cluster.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 5a3de859518..1c3175d4a7a 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -264,6 +264,8 @@ impl> Cluster { region_id } + /// We need to create FFIHelperSet while we create engine. + /// And later set its `node_id` when we are allocated one when start. pub fn create_ffi_helper_set( &mut self, engines: Engines, @@ -302,7 +304,8 @@ impl> Cluster { self.ffi_helper_lst.push(ffi_helper_set); } - // If None, use the last in the list, which is added by create_ffi_helper_set. + // If index is None, use the last in the list, which is added by create_ffi_helper_set. + // In most cases, index is `Some(0)`, which means we will use the first. pub fn associate_ffi_helper_set(&mut self, index: Option, node_id: u64) { let mut ffi_helper_set = if let Some(i) = index { self.ffi_helper_lst.remove(i) @@ -343,14 +346,11 @@ impl> Cluster { // Try recover from last shutdown. let mut node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect(); node_ids.sort(); - let mut cnt: usize = 0; - for node_id in node_ids { + for (cnt, node_id) in node_ids.iter().enumerate() { + let node_id = *node_id; if skip_set.contains(&cnt) { tikv_util::info!("skip start at {} is {}", cnt, node_id); - cnt += 1; continue; - } else { - cnt += 1; } debug!("recover node"; "node_id" => node_id); let _engines = self.engines.get_mut(&node_id).unwrap().clone(); @@ -372,8 +372,7 @@ impl> Cluster { // Try start new nodes. // Normally, this branch will not be called, since self.engines are already - // added in bootstrap_region o bootstrap_conf_change. - cnt = 0; + // added in bootstrap_region or bootstrap_conf_change. for _ in 0..self.count - self.engines.len() { if !skip_set.is_empty() { panic!("Error when start with skip set"); From 4e1f025f74e9262e4d23df93542479d845265482 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 21 Nov 2022 22:33:58 +0800 Subject: [PATCH 06/13] enrich tests Signed-off-by: CalvinNeo --- new-mock-engine-store/src/mock_cluster.rs | 9 +- proxy_tests/proxy/proxy.rs | 3 +- proxy_tests/proxy/region.rs | 100 ++++++++++++++++------ 3 files changed, 84 insertions(+), 28 deletions(-) diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 1c3175d4a7a..51f7d9276d5 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -304,8 +304,9 @@ impl> Cluster { self.ffi_helper_lst.push(ffi_helper_set); } - // If index is None, use the last in the list, which is added by create_ffi_helper_set. - // In most cases, index is `Some(0)`, which means we will use the first. + // If index is None, use the last in the list, which is added by + // create_ffi_helper_set. In most cases, index is `Some(0)`, which means we + // will use the first. pub fn associate_ffi_helper_set(&mut self, index: Option, node_id: u64) { let mut ffi_helper_set = if let Some(i) = index { self.ffi_helper_lst.remove(i) @@ -1044,6 +1045,10 @@ impl> Cluster { &self.engines[&node_id].kv } + pub fn get_engines(&self, node_id: u64) -> &Engines { + &self.engines[&node_id] + } + pub fn get_raw_engine(&self, node_id: u64) -> Arc { Arc::clone(self.engines[&node_id].kv.bad_downcast()) } diff --git a/proxy_tests/proxy/proxy.rs b/proxy_tests/proxy/proxy.rs index c194684bc1e..5d3ee337152 100644 --- a/proxy_tests/proxy/proxy.rs +++ b/proxy_tests/proxy/proxy.rs @@ -10,7 +10,7 @@ pub use std::{ }; pub use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; -pub use engine_traits::{MiscExt, CF_DEFAULT, CF_LOCK, CF_WRITE}; +pub use engine_traits::{MiscExt, Mutable, WriteBatch, CF_DEFAULT, CF_LOCK, CF_WRITE}; // use engine_store_ffi::config::{ensure_no_common_unrecognized_keys, ProxyConfig}; pub use engine_traits::{Peekable, CF_RAFT}; pub use kvproto::{ @@ -34,6 +34,7 @@ pub use raft::eraftpb::{ConfChangeType, MessageType}; pub use raftstore::coprocessor::ConsistencyCheckMethod; pub use test_raftstore::{new_learner_peer, new_peer}; pub use tikv_util::{ + box_err, box_try, config::{ReadableDuration, ReadableSize}, store::find_peer, time::Duration, diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index 1606fbc937e..1f88c54fed1 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -226,6 +226,29 @@ fn test_add_invalid_learner_peer_by_joint() { cluster.shutdown(); } +use engine_traits::{Engines, KvEngine, RaftEngine}; +use raftstore::store::{write_initial_apply_state, write_initial_raft_state}; + +pub fn prepare_bootstrap_cluster_with( + engines: &Engines, + region: &metapb::Region, +) -> raftstore::Result<()> { + let mut state = RegionLocalState::default(); + state.set_region(region.clone()); + + let mut wb = engines.kv.write_batch(); + box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, region)); + box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region.get_id()), &state)); + write_initial_apply_state(&mut wb, region.get_id())?; + wb.write()?; + engines.sync_kv()?; + + let mut raft_wb = engines.raft.log_batch(1024); + write_initial_raft_state(&mut raft_wb, region.get_id())?; + box_try!(engines.raft.consume(&mut raft_wb, true)); + Ok(()) +} + /// This test is very important. /// If make sure we can add learner peer for a store which is not started /// actually. @@ -266,44 +289,71 @@ fn test_add_learner_peer_before_start_by_joint() { None, Some(vec![1, 2, 3]), ); - // let new_states = maybe_collect_states(&cluster, 1, Some(vec![1,2,3])); - // assert_eq!(new_states.len(), 3); - // for i in new_states.keys() { - // assert_eq!( - // new_states - // .get(i) - // .unwrap() - // .in_disk_region_state - // .get_region() - // .get_peers() - // .len(), - // 1 + 2 /* AddPeer */ + 2 // Learner - // ); - // } + let new_states = maybe_collect_states(&cluster, 1, Some(vec![1, 2, 3])); + assert_eq!(new_states.len(), 3); + for i in new_states.keys() { + assert_eq!( + new_states + .get(i) + .unwrap() + .in_disk_region_state + .get_region() + .get_peers() + .len(), + 1 + 2 /* AddPeer */ + 2 // Learner + ); + } + + let region = new_states + .get(&1) + .unwrap() + .in_disk_region_state + .get_region(); assert_eq!(cluster.ffi_helper_lst.len(), 2); + + for id in vec![4, 5] { + let engines = cluster.get_engines(id); + assert!(prepare_bootstrap_cluster_with(engines, region).is_ok()); + } + cluster .start_with(HashSet::from_iter( vec![0, 1, 2].into_iter().map(|x| x as usize), )) .unwrap(); - pd_client.must_joint_confchange( - 1, - vec![ - (ConfChangeType::AddNode, new_peer(2, 2)), - (ConfChangeType::AddNode, new_peer(3, 3)), - (ConfChangeType::AddLearnerNode, new_learner_peer(4, 4)), - (ConfChangeType::AddLearnerNode, new_learner_peer(5, 5)), - ], - ); - // assert!(pd_client.is_in_joint(1)); - pd_client.must_leave_joint(1); + // pd_client.must_joint_confchange( + // 1, + // vec![ + // (ConfChangeType::AddNode, new_peer(2, 2)), + // (ConfChangeType::AddNode, new_peer(3, 3)), + // (ConfChangeType::AddLearnerNode, new_learner_peer(4, 4)), + // (ConfChangeType::AddLearnerNode, new_learner_peer(5, 5)), + // ], + // ); + // // assert!(pd_client.is_in_joint(1)); + // pd_client.must_leave_joint(1); cluster.must_put(b"k4", b"v4"); // std::thread::sleep(std::time::Duration::from_millis(3000)); check_key(&cluster, b"k4", b"v4", Some(true), None, None); + let new_states = maybe_collect_states(&cluster, 1, None); + assert_eq!(new_states.len(), 5); + for i in new_states.keys() { + assert_eq!( + new_states + .get(i) + .unwrap() + .in_disk_region_state + .get_region() + .get_peers() + .len(), + 1 + 2 /* AddPeer */ + 2 // Learner + ); + } + fail::remove("on_pre_persist_with_finish"); cluster.shutdown(); } From 93fe8a2ac94c37dd3dfbdea2050cf230a82e0128 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 22 Nov 2022 11:02:02 +0800 Subject: [PATCH 07/13] fmt Signed-off-by: CalvinNeo --- new-mock-engine-store/src/mock_cluster.rs | 3 +-- proxy_tests/proxy/region.rs | 18 +++--------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 51f7d9276d5..e91a14a4d56 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -260,8 +260,7 @@ impl> Cluster { pub fn run_conf_change_no_start(&mut self) -> u64 { self.create_engines(); - let region_id = self.bootstrap_conf_change(); - region_id + self.bootstrap_conf_change() } /// We need to create FFIHelperSet while we create engine. diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index 1f88c54fed1..0906bf48b39 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -148,7 +148,7 @@ fn test_get_region_local_state() { /// If make sure we can add learner peer for a store which is not started /// actually. #[test] -fn test_add_invalid_learner_peer_by_simple() { +fn test_add_absent_learner_peer_by_simple() { let (mut cluster, pd_client) = new_mock_cluster(0, 3); disable_auto_gen_compact_log(&mut cluster); // Disable default max peer count check. @@ -184,7 +184,7 @@ fn test_add_invalid_learner_peer_by_simple() { /// If make sure we can add learner peer for a store which is not started /// actually. #[test] -fn test_add_invalid_learner_peer_by_joint() { +fn test_add_absent_learner_peer_by_joint() { let (mut cluster, pd_client) = new_mock_cluster(0, 3); disable_auto_gen_compact_log(&mut cluster); // Disable default max peer count check. @@ -312,6 +312,7 @@ fn test_add_learner_peer_before_start_by_joint() { .get_region(); assert_eq!(cluster.ffi_helper_lst.len(), 2); + // Explicitly set region for store 4 and 5. for id in vec![4, 5] { let engines = cluster.get_engines(id); assert!(prepare_bootstrap_cluster_with(engines, region).is_ok()); @@ -323,20 +324,7 @@ fn test_add_learner_peer_before_start_by_joint() { )) .unwrap(); - // pd_client.must_joint_confchange( - // 1, - // vec![ - // (ConfChangeType::AddNode, new_peer(2, 2)), - // (ConfChangeType::AddNode, new_peer(3, 3)), - // (ConfChangeType::AddLearnerNode, new_learner_peer(4, 4)), - // (ConfChangeType::AddLearnerNode, new_learner_peer(5, 5)), - // ], - // ); - // // assert!(pd_client.is_in_joint(1)); - // pd_client.must_leave_joint(1); - cluster.must_put(b"k4", b"v4"); - // std::thread::sleep(std::time::Duration::from_millis(3000)); check_key(&cluster, b"k4", b"v4", Some(true), None, None); let new_states = maybe_collect_states(&cluster, 1, None); From ad9fc1480f133df5ba9b317d27e40218508bc5cb Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 22 Nov 2022 17:54:32 +0800 Subject: [PATCH 08/13] try reduce snapshot part1 Signed-off-by: CalvinNeo --- new-mock-engine-store/src/lib.rs | 2 +- new-mock-engine-store/src/mock_cluster.rs | 22 ++- proxy_tests/proxy/proxy.rs | 56 ++++++- proxy_tests/proxy/region.rs | 188 +++++++++++++++++++--- 4 files changed, 244 insertions(+), 24 deletions(-) diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index c28957ec606..a0bbcbab2f6 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -183,7 +183,7 @@ pub fn make_new_region( region } -fn write_kv_in_mem(region: &mut Box, cf_index: usize, k: &[u8], v: &[u8]) { +pub fn write_kv_in_mem(region: &mut Box, cf_index: usize, k: &[u8], v: &[u8]) { let data = &mut region.data[cf_index]; let pending_delete = &mut region.pending_delete[cf_index]; let pending_write = &mut region.pending_write[cf_index]; diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index e91a14a4d56..cc577314af7 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -247,12 +247,14 @@ impl> Cluster { pub fn run(&mut self) { self.create_engines(); self.bootstrap_region().unwrap(); + self.bootstrap_ffi_helper_set(); self.start().unwrap(); } pub fn run_conf_change(&mut self) -> u64 { self.create_engines(); let region_id = self.bootstrap_conf_change(); + self.bootstrap_ffi_helper_set(); // Will not start new nodes in `start` self.start().unwrap(); region_id @@ -260,7 +262,9 @@ impl> Cluster { pub fn run_conf_change_no_start(&mut self) -> u64 { self.create_engines(); - self.bootstrap_conf_change() + let region_id = self.bootstrap_conf_change(); + self.bootstrap_ffi_helper_set(); + region_id } /// We need to create FFIHelperSet while we create engine. @@ -319,6 +323,17 @@ impl> Cluster { .insert(node_id, ffi_helper_set); } + pub fn bootstrap_ffi_helper_set(&mut self) { + let mut node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect(); + // We force iterate engines in sorted order. + node_ids.sort(); + for (_, node_id) in node_ids.iter().enumerate() { + let node_id = *node_id; + // Always at the front of the vector. + self.associate_ffi_helper_set(Some(0), node_id); + } + } + pub fn create_engine( &mut self, router: Option>, @@ -344,7 +359,9 @@ impl> Cluster { init_global_ffi_helper_set(); // Try recover from last shutdown. + // `self.engines` is inited in bootstrap_region or bootstrap_conf_change. let mut node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect(); + // We force iterate engines in sorted order. node_ids.sort(); for (cnt, node_id) in node_ids.iter().enumerate() { let node_id = *node_id; @@ -355,8 +372,6 @@ impl> Cluster { debug!("recover node"; "node_id" => node_id); let _engines = self.engines.get_mut(&node_id).unwrap().clone(); let _key_mgr = self.key_managers_map[&node_id].clone(); - // Always at the front of the vector. - self.associate_ffi_helper_set(Some(0), node_id); // Like TiKVServer::init self.run_node(node_id)?; // Since we use None to create_ffi_helper_set, we must init again. @@ -993,6 +1008,7 @@ impl> Cluster { reqs: Vec, ) -> result::Result { let resp = self.request(region_key, reqs, false, Duration::from_secs(5), true); + debug!("!!!!! batch_put {:?}", resp); if resp.get_header().has_error() { Err(resp.get_header().get_error().clone()) } else { diff --git a/proxy_tests/proxy/proxy.rs b/proxy_tests/proxy/proxy.rs index 5d3ee337152..ca5cdca09e5 100644 --- a/proxy_tests/proxy/proxy.rs +++ b/proxy_tests/proxy/proxy.rs @@ -10,7 +10,9 @@ pub use std::{ }; pub use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; -pub use engine_traits::{MiscExt, Mutable, WriteBatch, CF_DEFAULT, CF_LOCK, CF_WRITE}; +pub use engine_traits::{ + MiscExt, Mutable, RaftLogBatch, WriteBatch, CF_DEFAULT, CF_LOCK, CF_WRITE, +}; // use engine_store_ffi::config::{ensure_no_common_unrecognized_keys, ProxyConfig}; pub use engine_traits::{Peekable, CF_RAFT}; pub use kvproto::{ @@ -22,13 +24,14 @@ pub use kvproto::{ }; pub use new_mock_engine_store::{ config::Config, + make_new_region, mock_cluster::{new_put_cmd, new_request, FFIHelperSet}, must_get_equal, must_get_none, node::NodeCluster, transport_simulate::{ CloneFilterFactory, CollectSnapshotFilter, Direction, RegionPacketFilter, }, - Cluster, ProxyConfig, Simulator, TestPdClient, + write_kv_in_mem, Cluster, ProxyConfig, Simulator, TestPdClient, }; pub use raft::eraftpb::{ConfChangeType, MessageType}; pub use raftstore::coprocessor::ConsistencyCheckMethod; @@ -198,6 +201,55 @@ pub fn must_get_mem( ) } +pub fn must_put_and_check_key_with_generator (String, String)>( + cluster: &mut Cluster, + gen: F, + from: u64, + to: u64, + in_mem: Option, + in_disk: Option, + engines: Option>, +) { + for i in from..to { + let (k, v) = gen(i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + for i in from..to { + let (k, v) = gen(i); + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + in_mem, + in_disk, + engines.clone(), + ); + } +} + +pub fn must_put_and_check_key( + cluster: &mut Cluster, + from: u64, + to: u64, + in_mem: Option, + in_disk: Option, + engines: Option>, +) { + must_put_and_check_key_with_generator( + cluster, + |i: u64| { + let k = format!("k{}", i); + let v = format!("v{}", i); + (k, v) + }, + from, + to, + in_mem, + in_disk, + engines.clone(), + ); +} + pub fn check_key( cluster: &Cluster, k: &[u8], diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index 0906bf48b39..9f2157e244b 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -249,12 +249,11 @@ pub fn prepare_bootstrap_cluster_with( Ok(()) } -/// This test is very important. -/// If make sure we can add learner peer for a store which is not started -/// actually. -#[test] -fn test_add_learner_peer_before_start_by_joint() { +fn new_later_add_learner_cluster)>( + initer: F, +) -> (Cluster, Arc) { let (mut cluster, pd_client) = new_mock_cluster(0, 5); + // Make sure we persist before generate snapshot. fail::cfg("on_pre_persist_with_finish", "return").unwrap(); cluster.cfg.proxy_compat = false; disable_auto_gen_compact_log(&mut cluster); @@ -265,8 +264,7 @@ fn test_add_learner_peer_before_start_by_joint() { let _ = cluster.start_with(HashSet::from_iter( vec![3, 4].into_iter().map(|x| x as usize), )); - cluster.must_put(b"k1", b"v1"); - check_key(&cluster, b"k1", b"v1", Some(true), None, Some(vec![1])); + initer(&mut cluster); pd_client.must_joint_confchange( 1, @@ -280,16 +278,11 @@ fn test_add_learner_peer_before_start_by_joint() { assert!(pd_client.is_in_joint(1)); pd_client.must_leave_joint(1); - cluster.must_put(b"k3", b"v3"); - check_key( - &cluster, - b"k3", - b"v3", - Some(true), - None, - Some(vec![1, 2, 3]), - ); + (cluster, pd_client) +} +fn later_bootstrap_learner_peer(cluster: &mut Cluster) { + // Check if the voters has correct learner peer. let new_states = maybe_collect_states(&cluster, 1, Some(vec![1, 2, 3])); assert_eq!(new_states.len(), 3); for i in new_states.keys() { @@ -305,19 +298,38 @@ fn test_add_learner_peer_before_start_by_joint() { ); } + let new_states = maybe_collect_states(&cluster, 1, Some(vec![1, 2, 3])); let region = new_states .get(&1) .unwrap() .in_disk_region_state .get_region(); - assert_eq!(cluster.ffi_helper_lst.len(), 2); - + // assert_eq!(cluster.ffi_helper_lst.len(), 2); // Explicitly set region for store 4 and 5. for id in vec![4, 5] { let engines = cluster.get_engines(id); assert!(prepare_bootstrap_cluster_with(engines, region).is_ok()); } +} + +#[test] +fn test_add_delayed_started_learner_by_joint() { + let (mut cluster, pd_client) = new_later_add_learner_cluster(|c: &mut Cluster| { + c.must_put(b"k1", b"v1"); + check_key(c, b"k1", b"v1", Some(true), None, Some(vec![1])); + }); + cluster.must_put(b"k2", b"v2"); + check_key( + &cluster, + b"k2", + b"v2", + Some(true), + None, + Some(vec![1, 2, 3]), + ); + + later_bootstrap_learner_peer(&mut cluster); cluster .start_with(HashSet::from_iter( vec![0, 1, 2].into_iter().map(|x| x as usize), @@ -345,3 +357,143 @@ fn test_add_learner_peer_before_start_by_joint() { fail::remove("on_pre_persist_with_finish"); cluster.shutdown(); } + +pub fn copy_meta_from( + source_engines: &Engines, + target_engines: &Engines, + source: &Box, + target: &mut Box, +) -> raftstore::Result<()> { + let region_id = source.region.get_id(); + + let mut wb = target_engines.kv.write_batch(); + let mut raft_wb = target_engines.raft.log_batch(1024); + + box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, &source.region)); + + // region local state + let mut state = RegionLocalState::default(); + state.set_region(source.region.clone()); + box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &state)); + + // apply state + wb.put_msg_cf( + CF_RAFT, + &keys::apply_state_key(region_id), + &source.apply_state, + )?; + target.apply_state = source.apply_state.clone(); + target.applied_term = source.applied_term; + + wb.write()?; + target_engines.sync_kv()?; + + // raft state + { + let key = keys::raft_state_key(region_id); + let raft_state = source_engines + .raft + .get_msg_cf(CF_DEFAULT, &key) + .unwrap() + .unwrap(); + raft_wb.put_raft_state(region_id, &raft_state)?; + }; + box_try!(target_engines.raft.consume(&mut raft_wb, true)); + Ok(()) +} + +#[test] +fn test_add_delayed_started_learner_snapshot() { + let (mut cluster, pd_client) = new_later_add_learner_cluster(|c: &mut Cluster| { + c.must_put(b"k1", b"v1"); + check_key(c, b"k1", b"v1", Some(true), None, Some(vec![1])); + }); + + must_put_and_check_key_with_generator( + &mut cluster, + |i: u64| (format!("k{}", i), (0..10240).map(|_| "X").collect()), + 10, + 20, + Some(true), + None, + Some(vec![1, 2, 3]), + ); + + // Force a compact log, so the leader have to send snapshot then. + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + let prev_states = maybe_collect_states(&cluster, region_id, Some(vec![1, 2, 3])); + { + let (compact_index, compact_term) = get_valid_compact_index(&prev_states); + assert!(compact_index > 15); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let _ = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + } + + later_bootstrap_learner_peer(&mut cluster); + // After that, we manually compose data, to avoid snapshot sending. + let leader_region_1 = cluster + .ffi_helper_set + .lock() + .unwrap() + .get_mut(&1) + .unwrap() + .engine_store_server + .kvstore + .get(&1) + .unwrap() + .clone(); + iter_ffi_helpers( + &cluster, + Some(vec![4, 5]), + &mut |id: u64, engine: &engine_rocks::RocksEngine, ffi: &mut FFIHelperSet| { + let server = &mut ffi.engine_store_server; + assert!(server.kvstore.get(®ion_id).is_none()); + let new_region = make_new_region(Some(leader_region_1.region.clone()), Some(id)); + server + .kvstore + .insert(leader_region_1.region.get_id(), Box::new(new_region)); + if let Some(region) = server.kvstore.get_mut(®ion_id) { + for cf in 0..3 { + for (k, v) in &leader_region_1.data[cf] { + write_kv_in_mem(region, cf, k.as_slice(), v.as_slice()); + } + } + } else { + panic!("error"); + } + }, + ); + + cluster + .start_with(HashSet::from_iter( + vec![0, 1, 2].into_iter().map(|x| x as usize), + )) + .unwrap(); + + cluster.must_put(b"z1", b"v1"); + check_key(&cluster, b"z1", b"v1", Some(true), None, None); + + // Check if every node has the correct configuation. + let new_states = maybe_collect_states(&cluster, 1, None); + assert_eq!(new_states.len(), 5); + for i in new_states.keys() { + assert_eq!( + new_states + .get(i) + .unwrap() + .in_disk_region_state + .get_region() + .get_peers() + .len(), + 1 + 2 /* AddPeer */ + 2 // Learner + ); + } + + fail::remove("on_pre_persist_with_finish"); + cluster.shutdown(); +} From e97a90aacc02ee7edceab42baa5ba494fef19b8d Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 23 Nov 2022 18:41:59 +0800 Subject: [PATCH 09/13] eliminate snapshot Signed-off-by: CalvinNeo --- Cargo.lock | 2 - Cargo.toml | 6 +- .../raftstore/src/store/entry_storage.rs | 5 + components/raftstore/src/store/fsm/apply.rs | 4 + components/raftstore/src/store/fsm/store.rs | 5 + .../raftstore/src/store/peer_storage.rs | 18 +- engine_tiflash/src/proxy_utils.rs | 2 + new-mock-engine-store/src/mock_cluster.rs | 5 +- new-mock-engine-store/src/node.rs | 17 +- proxy_tests/proxy/proxy.rs | 10 +- proxy_tests/proxy/region.rs | 157 +++++++++++++----- src/server/node.rs | 12 ++ 12 files changed, 190 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10138379132..3ded3d8e68a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4484,7 +4484,6 @@ dependencies = [ [[package]] name = "raft" version = "0.7.0" -source = "git+https://github.com/tikv/raft-rs?branch=master#2357cb22760719bcd107a90d1e64ef505bdb1e15" dependencies = [ "bytes", "fxhash", @@ -4532,7 +4531,6 @@ dependencies = [ [[package]] name = "raft-proto" version = "0.7.0" -source = "git+https://github.com/tikv/raft-rs?branch=master#2357cb22760719bcd107a90d1e64ef505bdb1e15" dependencies = [ "bytes", "protobuf", diff --git a/Cargo.toml b/Cargo.toml index bff99190c58..af8c479d1cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -193,8 +193,10 @@ zipf = "6.1.0" prometheus = { git = "https://github.com/solotzg/rust-prometheus.git", rev = "b4fe98a06a58d29f9b9987a0d7186f6ed5230193" } # TODO: remove this when new raft-rs is published. -raft = { git = "https://github.com/tikv/raft-rs", branch = "master" } -raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } +#raft = { git = "https://github.com/tikv/raft-rs", branch = "master" } +raft = { path = "/Users/calvin/tiflash/raft-rs" } +# raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } +raft-proto = { path = "/Users/calvin/tiflash/raft-rs/proto" } protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } diff --git a/components/raftstore/src/store/entry_storage.rs b/components/raftstore/src/store/entry_storage.rs index 705e2a776fa..469b11f42ef 100644 --- a/components/raftstore/src/store/entry_storage.rs +++ b/components/raftstore/src/store/entry_storage.rs @@ -940,15 +940,19 @@ impl EntryStorage { pub fn term(&self, idx: u64) -> raft::Result { if idx == self.truncated_index() { + debug!("!!!!! entry_storage A idx {}", idx); return Ok(self.truncated_term()); } self.check_range(idx, idx + 1)?; if self.truncated_term() == self.last_term || idx == self.last_index() { + debug!("!!!!! entry_storage B {} idx {} self.last_term {} self.last_index() {} state {:?}", self.peer_id, idx, self.last_term, self.last_index(), self.raft_state); return Ok(self.last_term); } if let Some(e) = self.cache.entry(idx) { + debug!("!!!!! entry_storage C idx {}", idx); Ok(e.get_term()) } else { + debug!("!!!!! entry_storage D idx {}", idx); Ok(self .raft_engine .get_entry(self.region_id, idx) @@ -965,6 +969,7 @@ impl EntryStorage { #[inline] pub fn last_index(&self) -> u64 { + debug!("!!!!! entry_storage B {} last_index state {:?}", self.peer_id, self.raft_state); last_index(&self.raft_state) } diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index bd582d1c24a..aee469be15a 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -3813,6 +3813,10 @@ where // witness shouldn't generate snapshot. return; } + + debug!("!!!!! generate snap"; + "to_store_id" => snap_task.to_store_id, + ); let applied_index = self.delegate.apply_state.get_applied_index(); let need_sync = apply_ctx .apply_res diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 28c0db02eee..ce76251796c 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1132,8 +1132,13 @@ impl RaftPollerBuilder { let mut merging_count = 0; let mut meta = self.store_meta.lock().unwrap(); let mut replication_state = self.global_replication_state.lock().unwrap(); + debug!("!!!!! scan start {}", store_id); + if store_id == 5 { + debug!("!!!!! read C {:?}", self.engines.kv.get_value_cf(CF_RAFT, "!!!ZZZ".as_bytes())); + } kv_engine.scan(CF_RAFT, start_key, end_key, false, |key, value| { let (region_id, suffix) = box_try!(keys::decode_region_meta_key(key)); + debug!("!!!!! scan {} {}", store_id, region_id); if suffix != keys::REGION_STATE_SUFFIX { return Ok(true); } diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 0d10b1f36cf..df6d1660177 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -174,16 +174,26 @@ fn init_raft_state( region: &Region, ) -> Result { if let Some(state) = engines.raft.get_raft_state(region.get_id())? { + debug!( + "init_raft_state 111"; + ); return Ok(state); } let mut raft_state = RaftLocalState::default(); if util::is_region_initialized(region) { + debug!( + "init_raft_state 222"; + ); // new split region raft_state.last_index = RAFT_INIT_LOG_INDEX; raft_state.mut_hard_state().set_term(RAFT_INIT_LOG_TERM); raft_state.mut_hard_state().set_commit(RAFT_INIT_LOG_INDEX); engines.raft.put_raft_state(region.get_id(), &raft_state)?; + } else { + debug!( + "init_raft_state 333"; + ); } Ok(raft_state) } @@ -299,14 +309,17 @@ where peer_id: u64, tag: String, ) -> Result> { + let raft_state = init_raft_state(&engines, region)?; + let apply_state = init_apply_state(&engines, region)?; + debug!( "creating storage on specified path"; "region_id" => region.get_id(), "peer_id" => peer_id, "path" => ?engines.kv.path(), + "raft_state" => ?raft_state, + "apply_state" => ?apply_state, ); - let raft_state = init_raft_state(&engines, region)?; - let apply_state = init_apply_state(&engines, region)?; let entry_storage = EntryStorage::new( peer_id, @@ -523,6 +536,7 @@ where "peer_id" => self.peer_id, "request_index" => request_index, "request_peer" => to, + "bt" => ?std::backtrace::Backtrace::capture(), ); let (sender, receiver) = mpsc::sync_channel(1); diff --git a/engine_tiflash/src/proxy_utils.rs b/engine_tiflash/src/proxy_utils.rs index 194a2292e46..5682831b19c 100644 --- a/engine_tiflash/src/proxy_utils.rs +++ b/engine_tiflash/src/proxy_utils.rs @@ -1,6 +1,7 @@ use crate::util::get_cf_handle; pub fn do_write(cf: &str, key: &[u8]) -> bool { + fail::fail_point!("before_tiflash_do_write", |_| true); match cf { engine_traits::CF_RAFT => true, engine_traits::CF_DEFAULT => { @@ -32,6 +33,7 @@ fn cf_to_name(batch: &crate::RocksWriteBatchVec, cf: u32) -> &'static str { fn check_double_write(batch: &crate::RocksWriteBatchVec) { // It will fire if we write by both observer(compat_old_proxy is not enabled) // and TiKV's WriteBatch. + fail::fail_point!("before_tiflash_check_double_write", |_| { return }); tikv_util::debug!("check if double write happens"); for wb in batch.wbs.iter() { for (_, cf, k, _) in wb.iter() { diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index cc577314af7..8ab14d91709 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -370,7 +370,10 @@ impl> Cluster { continue; } debug!("recover node"; "node_id" => node_id); - let _engines = self.engines.get_mut(&node_id).unwrap().clone(); + let engines = self.engines.get_mut(&node_id).unwrap().clone(); + if node_id == 5 { + debug!("!!!!! read A {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes())); + } let _key_mgr = self.key_managers_map[&node_id].clone(); // Like TiKVServer::init self.run_node(node_id)?; diff --git a/new-mock-engine-store/src/node.rs b/new-mock-engine-store/src/node.rs index ced1f343692..af5134c091b 100644 --- a/new-mock-engine-store/src/node.rs +++ b/new-mock-engine-store/src/node.rs @@ -340,8 +340,15 @@ impl Simulator for NodeCluster { Module::Coprocessor, Box::new(SplitCheckConfigManager(split_scheduler.clone())), ); - + if node_id == 5 { + debug!("!!!!! read B {:?} {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes()), + engines.raft.get_msg_cf::( + engine_traits::CF_DEFAULT, + &keys::raft_state_key(1), + )); + } node.try_bootstrap_store(engines.clone())?; + node.start( engines.clone(), simulate_trans.clone(), @@ -365,7 +372,13 @@ impl Simulator for NodeCluster { ); assert!(node_id == 0 || node_id == node.id()); - + if node_id == 5 { + debug!("!!!!! read B3 {:?} {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes()), + engines.raft.get_msg_cf::( + engine_traits::CF_DEFAULT, + &keys::raft_state_key(1), + )); + } let node_id = node.id(); debug!( "node_id: {} tmp: {:?}", diff --git a/proxy_tests/proxy/proxy.rs b/proxy_tests/proxy/proxy.rs index ca5cdca09e5..127dc45e57f 100644 --- a/proxy_tests/proxy/proxy.rs +++ b/proxy_tests/proxy/proxy.rs @@ -20,7 +20,7 @@ pub use kvproto::{ metapb, metapb::RegionEpoch, raft_cmdpb::{AdminCmdType, AdminRequest, CmdType, Request}, - raft_serverpb::{PeerState, RaftApplyState, RegionLocalState, StoreIdent}, + raft_serverpb::{PeerState, RaftApplyState, RegionLocalState, StoreIdent, RaftLocalState}, }; pub use new_mock_engine_store::{ config::Config, @@ -43,6 +43,7 @@ pub use tikv_util::{ time::Duration, HandyRwLock, }; +pub use engine_traits::RaftEngineDebug; // TODO Need refactor if moved to raft-engine pub fn get_region_local_state( @@ -67,6 +68,10 @@ pub fn get_apply_state(engine: &engine_rocks::RocksEngine, region_id: u64) -> Ra apply_state } +pub fn get_raft_local_state(raft_engine: &ER, region_id: u64) -> RaftLocalState { + raft_engine.get_raft_state(region_id).unwrap().unwrap() +} + pub fn new_compute_hash_request() -> AdminRequest { let mut req = AdminRequest::default(); req.set_cmd_type(AdminCmdType::ComputeHash); @@ -89,6 +94,7 @@ pub struct States { pub in_memory_applied_term: u64, pub in_disk_apply_state: RaftApplyState, pub in_disk_region_state: RegionLocalState, + pub in_disk_raft_state: RaftLocalState, pub ident: StoreIdent, } @@ -120,6 +126,7 @@ pub fn maybe_collect_states( store_ids, &mut |id: u64, engine: &engine_rocks::RocksEngine, ffi: &mut FFIHelperSet| { let server = &ffi.engine_store_server; + let raft_engine = &cluster.get_engines(id).raft; if let Some(region) = server.kvstore.get(®ion_id) { let ident = match engine.get_msg::(keys::STORE_IDENT_KEY) { Ok(Some(i)) => i, @@ -132,6 +139,7 @@ pub fn maybe_collect_states( in_memory_applied_term: region.applied_term, in_disk_apply_state: get_apply_state(&engine, region_id), in_disk_region_state: get_region_local_state(&engine, region_id), + in_disk_raft_state: get_raft_local_state(raft_engine, region_id), ident, }, ); diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index 9f2157e244b..9ea5db16a16 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -1,9 +1,8 @@ -use std::iter::FromIterator; - -use collections::HashSet; - // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use crate::proxy::*; +use std::iter::FromIterator; +use raft::eraftpb::Entry; +use collections::HashSet; #[test] fn test_handle_destroy() { @@ -227,7 +226,7 @@ fn test_add_absent_learner_peer_by_joint() { } use engine_traits::{Engines, KvEngine, RaftEngine}; -use raftstore::store::{write_initial_apply_state, write_initial_raft_state}; +use raftstore::store::{write_initial_apply_state, write_initial_raft_state, RAFT_INIT_LOG_INDEX}; pub fn prepare_bootstrap_cluster_with( engines: &Engines, @@ -237,7 +236,7 @@ pub fn prepare_bootstrap_cluster_with( state.set_region(region.clone()); let mut wb = engines.kv.write_batch(); - box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, region)); + // box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, region)); box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region.get_id()), &state)); write_initial_apply_state(&mut wb, region.get_id())?; wb.write()?; @@ -251,10 +250,12 @@ pub fn prepare_bootstrap_cluster_with( fn new_later_add_learner_cluster)>( initer: F, + learner: Vec, ) -> (Cluster, Arc) { let (mut cluster, pd_client) = new_mock_cluster(0, 5); // Make sure we persist before generate snapshot. fail::cfg("on_pre_persist_with_finish", "return").unwrap(); + cluster.cfg.proxy_compat = false; disable_auto_gen_compact_log(&mut cluster); // Disable default max peer count check. @@ -266,14 +267,15 @@ fn new_later_add_learner_cluster)>( )); initer(&mut cluster); + let mut peers = vec![ + (ConfChangeType::AddNode, new_peer(2, 2)), + (ConfChangeType::AddNode, new_peer(3, 3)), + ]; + let mut learner_peers: Vec<(ConfChangeType, kvproto::metapb::Peer)> = learner.iter().map(|i| (ConfChangeType::AddLearnerNode, new_learner_peer(*i, *i))).collect(); + peers.append(&mut learner_peers); pd_client.must_joint_confchange( 1, - vec![ - (ConfChangeType::AddNode, new_peer(2, 2)), - (ConfChangeType::AddNode, new_peer(3, 3)), - (ConfChangeType::AddLearnerNode, new_learner_peer(4, 4)), - (ConfChangeType::AddLearnerNode, new_learner_peer(5, 5)), - ], + peers, ); assert!(pd_client.is_in_joint(1)); pd_client.must_leave_joint(1); @@ -281,7 +283,7 @@ fn new_later_add_learner_cluster)>( (cluster, pd_client) } -fn later_bootstrap_learner_peer(cluster: &mut Cluster) { +fn later_bootstrap_learner_peer(cluster: &mut Cluster, peers: Vec, already_learner_count: usize) { // Check if the voters has correct learner peer. let new_states = maybe_collect_states(&cluster, 1, Some(vec![1, 2, 3])); assert_eq!(new_states.len(), 3); @@ -294,19 +296,17 @@ fn later_bootstrap_learner_peer(cluster: &mut Cluster) { .get_region() .get_peers() .len(), - 1 + 2 /* AddPeer */ + 2 // Learner + 1 + 2 /* AddPeer */ + already_learner_count // Learner ); } - let new_states = maybe_collect_states(&cluster, 1, Some(vec![1, 2, 3])); let region = new_states .get(&1) .unwrap() .in_disk_region_state .get_region(); - // assert_eq!(cluster.ffi_helper_lst.len(), 2); - // Explicitly set region for store 4 and 5. - for id in vec![4, 5] { + // Explicitly bootstrap region. + for id in peers { let engines = cluster.get_engines(id); assert!(prepare_bootstrap_cluster_with(engines, region).is_ok()); } @@ -317,7 +317,9 @@ fn test_add_delayed_started_learner_by_joint() { let (mut cluster, pd_client) = new_later_add_learner_cluster(|c: &mut Cluster| { c.must_put(b"k1", b"v1"); check_key(c, b"k1", b"v1", Some(true), None, Some(vec![1])); - }); + }, + vec![4, 5] + ); cluster.must_put(b"k2", b"v2"); check_key( @@ -329,7 +331,7 @@ fn test_add_delayed_started_learner_by_joint() { Some(vec![1, 2, 3]), ); - later_bootstrap_learner_peer(&mut cluster); + later_bootstrap_learner_peer(&mut cluster, vec![4, 5], 2); cluster .start_with(HashSet::from_iter( vec![0, 1, 2].into_iter().map(|x| x as usize), @@ -359,34 +361,49 @@ fn test_add_delayed_started_learner_by_joint() { } pub fn copy_meta_from( - source_engines: &Engines, + source_engines: &Engines, target_engines: &Engines, source: &Box, target: &mut Box, + region: kvproto::metapb::Region, ) -> raftstore::Result<()> { let region_id = source.region.get_id(); let mut wb = target_engines.kv.write_batch(); let mut raft_wb = target_engines.raft.log_batch(1024); - box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, &source.region)); + // box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, &source.region)); // region local state let mut state = RegionLocalState::default(); - state.set_region(source.region.clone()); + state.set_region(region); box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &state)); // apply state - wb.put_msg_cf( - CF_RAFT, - &keys::apply_state_key(region_id), - &source.apply_state, - )?; - target.apply_state = source.apply_state.clone(); - target.applied_term = source.applied_term; + { + let key = keys::apply_state_key(region_id); + let apply_state: RaftApplyState = source_engines + .kv + .get_msg_cf(CF_RAFT, &key) + .unwrap() + .unwrap(); + wb.put_msg_cf( + CF_RAFT, + &keys::apply_state_key(region_id), + &apply_state, + )?; + wb.put_cf( + CF_RAFT, + "!!!ZZZ".as_bytes(), + "VVVVV".as_bytes() + ).unwrap(); + target.apply_state = apply_state.clone(); + target.applied_term = source.applied_term; + } wb.write()?; target_engines.sync_kv()?; + debug!("!!!!! read AAA {:?}", target_engines.kv.get_value_cf(CF_RAFT, "!!!ZZZ".as_bytes())); // raft state { @@ -396,19 +413,43 @@ pub fn copy_meta_from( .get_msg_cf(CF_DEFAULT, &key) .unwrap() .unwrap(); + debug!("!!!!! raft log state {:?}", raft_state); raft_wb.put_raft_state(region_id, &raft_state)?; }; + + // raft log + let mut entries: Vec = Default::default(); + source_engines.raft.scan_entries( + region_id, + |e| { + debug!("copy raft log"; "e" => ?e); + entries.push(e.clone()); + Ok(true) + }, + ).unwrap(); + + raft_wb.append(region_id, entries)?; box_try!(target_engines.raft.consume(&mut raft_wb, true)); + Ok(()) } #[test] fn test_add_delayed_started_learner_snapshot() { + // fail::cfg("before_tiflash_check_double_write", "return").unwrap(); + // fail::cfg("before_tiflash_do_write", "return").unwrap(); let (mut cluster, pd_client) = new_later_add_learner_cluster(|c: &mut Cluster| { c.must_put(b"k1", b"v1"); check_key(c, b"k1", b"v1", Some(true), None, Some(vec![1])); - }); + },vec![4]); + // Start Leader store 4. + cluster + .start_with(HashSet::from_iter( + vec![0, 1, 2, 4].into_iter().map(|x| x as usize), + )) + .unwrap(); + must_put_and_check_key_with_generator( &mut cluster, |i: u64| (format!("k{}", i), (0..10240).map(|_| "X").collect()), @@ -416,7 +457,7 @@ fn test_add_delayed_started_learner_snapshot() { 20, Some(true), None, - Some(vec![1, 2, 3]), + Some(vec![1, 2, 3, 4]), ); // Force a compact log, so the leader have to send snapshot then. @@ -434,53 +475,80 @@ fn test_add_delayed_started_learner_snapshot() { .unwrap(); } - later_bootstrap_learner_peer(&mut cluster); + cluster.must_transfer_leader(1, new_peer(1, 1)); + + // Simulate 4 is lost, recover its data to node 5. + cluster.stop_node(4); + later_bootstrap_learner_peer(&mut cluster, vec![5], 1); // After that, we manually compose data, to avoid snapshot sending. - let leader_region_1 = cluster + let source_region_1 = cluster .ffi_helper_set .lock() .unwrap() - .get_mut(&1) + .get_mut(&4) .unwrap() .engine_store_server .kvstore .get(&1) .unwrap() .clone(); + let mut new_region_meta = source_region_1.region.clone(); + new_region_meta.mut_peers().push(new_learner_peer(5, 5)); + + // Copy all node 4's data to node 5 iter_ffi_helpers( &cluster, - Some(vec![4, 5]), + Some(vec![5]), &mut |id: u64, engine: &engine_rocks::RocksEngine, ffi: &mut FFIHelperSet| { let server = &mut ffi.engine_store_server; assert!(server.kvstore.get(®ion_id).is_none()); - let new_region = make_new_region(Some(leader_region_1.region.clone()), Some(id)); + let new_region = make_new_region(Some(source_region_1.region.clone()), Some(id)); server .kvstore - .insert(leader_region_1.region.get_id(), Box::new(new_region)); + .insert(source_region_1.region.get_id(), Box::new(new_region)); if let Some(region) = server.kvstore.get_mut(®ion_id) { for cf in 0..3 { - for (k, v) in &leader_region_1.data[cf] { + for (k, v) in &source_region_1.data[cf] { write_kv_in_mem(region, cf, k.as_slice(), v.as_slice()); } } + let source_engines = cluster.get_engines(4); + let target_engines = cluster.get_engines(5); + copy_meta_from(source_engines, target_engines, &source_region_1, region, new_region_meta.clone()).unwrap(); } else { panic!("error"); } }, ); + { + let prev_states = maybe_collect_states(&cluster, region_id, None); + assert_eq!(prev_states.get(&4).unwrap().in_disk_apply_state, prev_states.get(&5).unwrap().in_disk_apply_state); + debug!("!!!!! ssssss {} {:?}", cluster.id(), prev_states.get(&5).unwrap()); + } + // Add node 5 to cluster. + pd_client.must_add_peer(1, new_learner_peer(5, 5)); + fail::cfg("apply_on_handle_snapshot_finish_1_1", "panic").unwrap(); + // Start store 5. + + debug!("!!!!! read AA {:?}", cluster.get_engines(5).kv.get_value_cf(CF_RAFT, "!!!ZZZ".as_bytes())); cluster .start_with(HashSet::from_iter( - vec![0, 1, 2].into_iter().map(|x| x as usize), + vec![0, 1, 2, 3].into_iter().map(|x| x as usize), )) .unwrap(); + { + let prev_states = maybe_collect_states(&cluster, region_id, None); + debug!("!!!!! ssssss222 {:?}", prev_states.get(&5).unwrap()); + } + cluster.must_put(b"z1", b"v1"); - check_key(&cluster, b"z1", b"v1", Some(true), None, None); + check_key(&cluster, b"z1", b"v1", Some(true), None, Some(vec![1,2,3,5])); // Check if every node has the correct configuation. - let new_states = maybe_collect_states(&cluster, 1, None); - assert_eq!(new_states.len(), 5); + let new_states = maybe_collect_states(&cluster, 1, Some(vec![1,2,3,5])); + assert_eq!(new_states.len(), 4); for i in new_states.keys() { assert_eq!( new_states @@ -494,6 +562,9 @@ fn test_add_delayed_started_learner_snapshot() { ); } + fail::remove("apply_on_handle_snapshot_finish_1_1"); fail::remove("on_pre_persist_with_finish"); + // fail::remove("before_tiflash_check_double_write"); + // fail::remove("before_tiflash_do_write"); cluster.shutdown(); } diff --git a/src/server/node.rs b/src/server/node.rs index 0b654921f59..7b782a14362 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -232,6 +232,11 @@ where let mut meta = store_meta.lock().unwrap(); meta.store_id = Some(store_id); } + + if store_id == 5 { + debug!("!!!!! read B2 {:?} {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes()), + engines.raft.get_raft_state(1)); + } if let Some(first_region) = self.check_or_prepare_bootstrap_cluster(&engines, store_id)? { info!("trying to bootstrap cluster"; "store_id" => store_id, "region" => ?first_region); // cluster is not bootstrapped, and we choose first store to bootstrap @@ -241,6 +246,10 @@ where self.bootstrap_cluster(&engines, first_region)?; } + if store_id == 5 { + debug!("!!!!! read B22 {:?} {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes()), + engines.raft.get_raft_state(1)); + } // Put store only if the cluster is bootstrapped. info!("put store to PD"; "store" => ?&self.store); let status = self.pd_client.put_store(self.store.clone())?; @@ -427,10 +436,13 @@ where store_id: u64, ) -> Result> { if let Some(first_region) = engines.kv.get_msg(keys::PREPARE_BOOTSTRAP_KEY)? { + debug!("!!!!! check_or_prepare_bootstrap_cluster sid {}, A", store_id); Ok(Some(first_region)) } else if self.check_cluster_bootstrapped()? { + debug!("!!!!! check_or_prepare_bootstrap_cluster sid {}, B", store_id); Ok(None) } else { + debug!("!!!!! check_or_prepare_bootstrap_cluster sid {}, C", store_id); self.prepare_bootstrap_cluster(engines, store_id).map(Some) } } From 08b0e55b641ed08b084f14ad997b1eb23522a6b7 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 23 Nov 2022 18:53:54 +0800 Subject: [PATCH 10/13] remove all debug codes Signed-off-by: CalvinNeo --- Cargo.lock | 2 + Cargo.toml | 6 +- .../raftstore/src/store/entry_storage.rs | 5 - components/raftstore/src/store/fsm/apply.rs | 4 - components/raftstore/src/store/fsm/store.rs | 5 - .../raftstore/src/store/peer_storage.rs | 18 +-- new-mock-engine-store/src/mock_cluster.rs | 6 - new-mock-engine-store/src/node.rs | 16 +-- proxy_tests/proxy/proxy.rs | 10 +- proxy_tests/proxy/region.rs | 115 ++++++++++-------- src/server/node.rs | 12 -- 11 files changed, 79 insertions(+), 120 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ded3d8e68a..fa64aa145a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4484,6 +4484,7 @@ dependencies = [ [[package]] name = "raft" version = "0.7.0" +source = "git+https://github.com/tikv/raft-rs?branch=master#36d3293a8b1a32c4b4115855419108386abcdc4a" dependencies = [ "bytes", "fxhash", @@ -4531,6 +4532,7 @@ dependencies = [ [[package]] name = "raft-proto" version = "0.7.0" +source = "git+https://github.com/tikv/raft-rs?branch=master#36d3293a8b1a32c4b4115855419108386abcdc4a" dependencies = [ "bytes", "protobuf", diff --git a/Cargo.toml b/Cargo.toml index af8c479d1cf..bff99190c58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -193,10 +193,8 @@ zipf = "6.1.0" prometheus = { git = "https://github.com/solotzg/rust-prometheus.git", rev = "b4fe98a06a58d29f9b9987a0d7186f6ed5230193" } # TODO: remove this when new raft-rs is published. -#raft = { git = "https://github.com/tikv/raft-rs", branch = "master" } -raft = { path = "/Users/calvin/tiflash/raft-rs" } -# raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } -raft-proto = { path = "/Users/calvin/tiflash/raft-rs/proto" } +raft = { git = "https://github.com/tikv/raft-rs", branch = "master" } +raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } diff --git a/components/raftstore/src/store/entry_storage.rs b/components/raftstore/src/store/entry_storage.rs index 469b11f42ef..705e2a776fa 100644 --- a/components/raftstore/src/store/entry_storage.rs +++ b/components/raftstore/src/store/entry_storage.rs @@ -940,19 +940,15 @@ impl EntryStorage { pub fn term(&self, idx: u64) -> raft::Result { if idx == self.truncated_index() { - debug!("!!!!! entry_storage A idx {}", idx); return Ok(self.truncated_term()); } self.check_range(idx, idx + 1)?; if self.truncated_term() == self.last_term || idx == self.last_index() { - debug!("!!!!! entry_storage B {} idx {} self.last_term {} self.last_index() {} state {:?}", self.peer_id, idx, self.last_term, self.last_index(), self.raft_state); return Ok(self.last_term); } if let Some(e) = self.cache.entry(idx) { - debug!("!!!!! entry_storage C idx {}", idx); Ok(e.get_term()) } else { - debug!("!!!!! entry_storage D idx {}", idx); Ok(self .raft_engine .get_entry(self.region_id, idx) @@ -969,7 +965,6 @@ impl EntryStorage { #[inline] pub fn last_index(&self) -> u64 { - debug!("!!!!! entry_storage B {} last_index state {:?}", self.peer_id, self.raft_state); last_index(&self.raft_state) } diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index aee469be15a..bd582d1c24a 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -3813,10 +3813,6 @@ where // witness shouldn't generate snapshot. return; } - - debug!("!!!!! generate snap"; - "to_store_id" => snap_task.to_store_id, - ); let applied_index = self.delegate.apply_state.get_applied_index(); let need_sync = apply_ctx .apply_res diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index ce76251796c..28c0db02eee 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1132,13 +1132,8 @@ impl RaftPollerBuilder { let mut merging_count = 0; let mut meta = self.store_meta.lock().unwrap(); let mut replication_state = self.global_replication_state.lock().unwrap(); - debug!("!!!!! scan start {}", store_id); - if store_id == 5 { - debug!("!!!!! read C {:?}", self.engines.kv.get_value_cf(CF_RAFT, "!!!ZZZ".as_bytes())); - } kv_engine.scan(CF_RAFT, start_key, end_key, false, |key, value| { let (region_id, suffix) = box_try!(keys::decode_region_meta_key(key)); - debug!("!!!!! scan {} {}", store_id, region_id); if suffix != keys::REGION_STATE_SUFFIX { return Ok(true); } diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index df6d1660177..0d10b1f36cf 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -174,26 +174,16 @@ fn init_raft_state( region: &Region, ) -> Result { if let Some(state) = engines.raft.get_raft_state(region.get_id())? { - debug!( - "init_raft_state 111"; - ); return Ok(state); } let mut raft_state = RaftLocalState::default(); if util::is_region_initialized(region) { - debug!( - "init_raft_state 222"; - ); // new split region raft_state.last_index = RAFT_INIT_LOG_INDEX; raft_state.mut_hard_state().set_term(RAFT_INIT_LOG_TERM); raft_state.mut_hard_state().set_commit(RAFT_INIT_LOG_INDEX); engines.raft.put_raft_state(region.get_id(), &raft_state)?; - } else { - debug!( - "init_raft_state 333"; - ); } Ok(raft_state) } @@ -309,17 +299,14 @@ where peer_id: u64, tag: String, ) -> Result> { - let raft_state = init_raft_state(&engines, region)?; - let apply_state = init_apply_state(&engines, region)?; - debug!( "creating storage on specified path"; "region_id" => region.get_id(), "peer_id" => peer_id, "path" => ?engines.kv.path(), - "raft_state" => ?raft_state, - "apply_state" => ?apply_state, ); + let raft_state = init_raft_state(&engines, region)?; + let apply_state = init_apply_state(&engines, region)?; let entry_storage = EntryStorage::new( peer_id, @@ -536,7 +523,6 @@ where "peer_id" => self.peer_id, "request_index" => request_index, "request_peer" => to, - "bt" => ?std::backtrace::Backtrace::capture(), ); let (sender, receiver) = mpsc::sync_channel(1); diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 8ab14d91709..c75ee573fd6 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -370,11 +370,6 @@ impl> Cluster { continue; } debug!("recover node"; "node_id" => node_id); - let engines = self.engines.get_mut(&node_id).unwrap().clone(); - if node_id == 5 { - debug!("!!!!! read A {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes())); - } - let _key_mgr = self.key_managers_map[&node_id].clone(); // Like TiKVServer::init self.run_node(node_id)?; // Since we use None to create_ffi_helper_set, we must init again. @@ -1011,7 +1006,6 @@ impl> Cluster { reqs: Vec, ) -> result::Result { let resp = self.request(region_key, reqs, false, Duration::from_secs(5), true); - debug!("!!!!! batch_put {:?}", resp); if resp.get_header().has_error() { Err(resp.get_header().get_error().clone()) } else { diff --git a/new-mock-engine-store/src/node.rs b/new-mock-engine-store/src/node.rs index af5134c091b..954050a7f2c 100644 --- a/new-mock-engine-store/src/node.rs +++ b/new-mock-engine-store/src/node.rs @@ -340,13 +340,7 @@ impl Simulator for NodeCluster { Module::Coprocessor, Box::new(SplitCheckConfigManager(split_scheduler.clone())), ); - if node_id == 5 { - debug!("!!!!! read B {:?} {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes()), - engines.raft.get_msg_cf::( - engine_traits::CF_DEFAULT, - &keys::raft_state_key(1), - )); - } + node.try_bootstrap_store(engines.clone())?; node.start( @@ -372,13 +366,7 @@ impl Simulator for NodeCluster { ); assert!(node_id == 0 || node_id == node.id()); - if node_id == 5 { - debug!("!!!!! read B3 {:?} {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes()), - engines.raft.get_msg_cf::( - engine_traits::CF_DEFAULT, - &keys::raft_state_key(1), - )); - } + let node_id = node.id(); debug!( "node_id: {} tmp: {:?}", diff --git a/proxy_tests/proxy/proxy.rs b/proxy_tests/proxy/proxy.rs index 127dc45e57f..d5a5aebdbcb 100644 --- a/proxy_tests/proxy/proxy.rs +++ b/proxy_tests/proxy/proxy.rs @@ -11,7 +11,7 @@ pub use std::{ pub use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; pub use engine_traits::{ - MiscExt, Mutable, RaftLogBatch, WriteBatch, CF_DEFAULT, CF_LOCK, CF_WRITE, + MiscExt, Mutable, RaftEngineDebug, RaftLogBatch, WriteBatch, CF_DEFAULT, CF_LOCK, CF_WRITE, }; // use engine_store_ffi::config::{ensure_no_common_unrecognized_keys, ProxyConfig}; pub use engine_traits::{Peekable, CF_RAFT}; @@ -20,7 +20,7 @@ pub use kvproto::{ metapb, metapb::RegionEpoch, raft_cmdpb::{AdminCmdType, AdminRequest, CmdType, Request}, - raft_serverpb::{PeerState, RaftApplyState, RegionLocalState, StoreIdent, RaftLocalState}, + raft_serverpb::{PeerState, RaftApplyState, RaftLocalState, RegionLocalState, StoreIdent}, }; pub use new_mock_engine_store::{ config::Config, @@ -43,7 +43,6 @@ pub use tikv_util::{ time::Duration, HandyRwLock, }; -pub use engine_traits::RaftEngineDebug; // TODO Need refactor if moved to raft-engine pub fn get_region_local_state( @@ -68,7 +67,10 @@ pub fn get_apply_state(engine: &engine_rocks::RocksEngine, region_id: u64) -> Ra apply_state } -pub fn get_raft_local_state(raft_engine: &ER, region_id: u64) -> RaftLocalState { +pub fn get_raft_local_state( + raft_engine: &ER, + region_id: u64, +) -> RaftLocalState { raft_engine.get_raft_state(region_id).unwrap().unwrap() } diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index 9ea5db16a16..42878435698 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -1,8 +1,10 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use crate::proxy::*; use std::iter::FromIterator; -use raft::eraftpb::Entry; + use collections::HashSet; +use raft::eraftpb::Entry; + +use crate::proxy::*; #[test] fn test_handle_destroy() { @@ -255,7 +257,7 @@ fn new_later_add_learner_cluster)>( let (mut cluster, pd_client) = new_mock_cluster(0, 5); // Make sure we persist before generate snapshot. fail::cfg("on_pre_persist_with_finish", "return").unwrap(); - + cluster.cfg.proxy_compat = false; disable_auto_gen_compact_log(&mut cluster); // Disable default max peer count check. @@ -271,19 +273,23 @@ fn new_later_add_learner_cluster)>( (ConfChangeType::AddNode, new_peer(2, 2)), (ConfChangeType::AddNode, new_peer(3, 3)), ]; - let mut learner_peers: Vec<(ConfChangeType, kvproto::metapb::Peer)> = learner.iter().map(|i| (ConfChangeType::AddLearnerNode, new_learner_peer(*i, *i))).collect(); + let mut learner_peers: Vec<(ConfChangeType, kvproto::metapb::Peer)> = learner + .iter() + .map(|i| (ConfChangeType::AddLearnerNode, new_learner_peer(*i, *i))) + .collect(); peers.append(&mut learner_peers); - pd_client.must_joint_confchange( - 1, - peers, - ); + pd_client.must_joint_confchange(1, peers); assert!(pd_client.is_in_joint(1)); pd_client.must_leave_joint(1); (cluster, pd_client) } -fn later_bootstrap_learner_peer(cluster: &mut Cluster, peers: Vec, already_learner_count: usize) { +fn later_bootstrap_learner_peer( + cluster: &mut Cluster, + peers: Vec, + already_learner_count: usize, +) { // Check if the voters has correct learner peer. let new_states = maybe_collect_states(&cluster, 1, Some(vec![1, 2, 3])); assert_eq!(new_states.len(), 3); @@ -314,11 +320,12 @@ fn later_bootstrap_learner_peer(cluster: &mut Cluster, peers: Vec| { - c.must_put(b"k1", b"v1"); - check_key(c, b"k1", b"v1", Some(true), None, Some(vec![1])); - }, - vec![4, 5] + let (mut cluster, pd_client) = new_later_add_learner_cluster( + |c: &mut Cluster| { + c.must_put(b"k1", b"v1"); + check_key(c, b"k1", b"v1", Some(true), None, Some(vec![1])); + }, + vec![4, 5], ); cluster.must_put(b"k2", b"v2"); @@ -361,7 +368,10 @@ fn test_add_delayed_started_learner_by_joint() { } pub fn copy_meta_from( - source_engines: &Engines, + source_engines: &Engines< + impl KvEngine, + impl RaftEngine + engine_traits::Peekable + RaftEngineDebug, + >, target_engines: &Engines, source: &Box, target: &mut Box, @@ -387,23 +397,15 @@ pub fn copy_meta_from( .get_msg_cf(CF_RAFT, &key) .unwrap() .unwrap(); - wb.put_msg_cf( - CF_RAFT, - &keys::apply_state_key(region_id), - &apply_state, - )?; - wb.put_cf( - CF_RAFT, - "!!!ZZZ".as_bytes(), - "VVVVV".as_bytes() - ).unwrap(); + wb.put_msg_cf(CF_RAFT, &keys::apply_state_key(region_id), &apply_state)?; + wb.put_cf(CF_RAFT, "!!!ZZZ".as_bytes(), "VVVVV".as_bytes()) + .unwrap(); target.apply_state = apply_state.clone(); target.applied_term = source.applied_term; } wb.write()?; target_engines.sync_kv()?; - debug!("!!!!! read AAA {:?}", target_engines.kv.get_value_cf(CF_RAFT, "!!!ZZZ".as_bytes())); // raft state { @@ -413,20 +415,19 @@ pub fn copy_meta_from( .get_msg_cf(CF_DEFAULT, &key) .unwrap() .unwrap(); - debug!("!!!!! raft log state {:?}", raft_state); raft_wb.put_raft_state(region_id, &raft_state)?; }; - + // raft log let mut entries: Vec = Default::default(); - source_engines.raft.scan_entries( - region_id, - |e| { + source_engines + .raft + .scan_entries(region_id, |e| { debug!("copy raft log"; "e" => ?e); entries.push(e.clone()); Ok(true) - }, - ).unwrap(); + }) + .unwrap(); raft_wb.append(region_id, entries)?; box_try!(target_engines.raft.consume(&mut raft_wb, true)); @@ -438,10 +439,13 @@ pub fn copy_meta_from( fn test_add_delayed_started_learner_snapshot() { // fail::cfg("before_tiflash_check_double_write", "return").unwrap(); // fail::cfg("before_tiflash_do_write", "return").unwrap(); - let (mut cluster, pd_client) = new_later_add_learner_cluster(|c: &mut Cluster| { - c.must_put(b"k1", b"v1"); - check_key(c, b"k1", b"v1", Some(true), None, Some(vec![1])); - },vec![4]); + let (mut cluster, pd_client) = new_later_add_learner_cluster( + |c: &mut Cluster| { + c.must_put(b"k1", b"v1"); + check_key(c, b"k1", b"v1", Some(true), None, Some(vec![1])); + }, + vec![4], + ); // Start Leader store 4. cluster @@ -449,7 +453,7 @@ fn test_add_delayed_started_learner_snapshot() { vec![0, 1, 2, 4].into_iter().map(|x| x as usize), )) .unwrap(); - + must_put_and_check_key_with_generator( &mut cluster, |i: u64| (format!("k{}", i), (0..10240).map(|_| "X").collect()), @@ -476,7 +480,7 @@ fn test_add_delayed_started_learner_snapshot() { } cluster.must_transfer_leader(1, new_peer(1, 1)); - + // Simulate 4 is lost, recover its data to node 5. cluster.stop_node(4); later_bootstrap_learner_peer(&mut cluster, vec![5], 1); @@ -514,7 +518,14 @@ fn test_add_delayed_started_learner_snapshot() { } let source_engines = cluster.get_engines(4); let target_engines = cluster.get_engines(5); - copy_meta_from(source_engines, target_engines, &source_region_1, region, new_region_meta.clone()).unwrap(); + copy_meta_from( + source_engines, + target_engines, + &source_region_1, + region, + new_region_meta.clone(), + ) + .unwrap(); } else { panic!("error"); } @@ -522,8 +533,10 @@ fn test_add_delayed_started_learner_snapshot() { ); { let prev_states = maybe_collect_states(&cluster, region_id, None); - assert_eq!(prev_states.get(&4).unwrap().in_disk_apply_state, prev_states.get(&5).unwrap().in_disk_apply_state); - debug!("!!!!! ssssss {} {:?}", cluster.id(), prev_states.get(&5).unwrap()); + assert_eq!( + prev_states.get(&4).unwrap().in_disk_apply_state, + prev_states.get(&5).unwrap().in_disk_apply_state + ); } // Add node 5 to cluster. @@ -531,23 +544,25 @@ fn test_add_delayed_started_learner_snapshot() { fail::cfg("apply_on_handle_snapshot_finish_1_1", "panic").unwrap(); // Start store 5. - debug!("!!!!! read AA {:?}", cluster.get_engines(5).kv.get_value_cf(CF_RAFT, "!!!ZZZ".as_bytes())); cluster .start_with(HashSet::from_iter( vec![0, 1, 2, 3].into_iter().map(|x| x as usize), )) .unwrap(); - { - let prev_states = maybe_collect_states(&cluster, region_id, None); - debug!("!!!!! ssssss222 {:?}", prev_states.get(&5).unwrap()); - } - + cluster.must_put(b"z1", b"v1"); - check_key(&cluster, b"z1", b"v1", Some(true), None, Some(vec![1,2,3,5])); + check_key( + &cluster, + b"z1", + b"v1", + Some(true), + None, + Some(vec![1, 2, 3, 5]), + ); // Check if every node has the correct configuation. - let new_states = maybe_collect_states(&cluster, 1, Some(vec![1,2,3,5])); + let new_states = maybe_collect_states(&cluster, 1, Some(vec![1, 2, 3, 5])); assert_eq!(new_states.len(), 4); for i in new_states.keys() { assert_eq!( diff --git a/src/server/node.rs b/src/server/node.rs index 7b782a14362..0b654921f59 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -232,11 +232,6 @@ where let mut meta = store_meta.lock().unwrap(); meta.store_id = Some(store_id); } - - if store_id == 5 { - debug!("!!!!! read B2 {:?} {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes()), - engines.raft.get_raft_state(1)); - } if let Some(first_region) = self.check_or_prepare_bootstrap_cluster(&engines, store_id)? { info!("trying to bootstrap cluster"; "store_id" => store_id, "region" => ?first_region); // cluster is not bootstrapped, and we choose first store to bootstrap @@ -246,10 +241,6 @@ where self.bootstrap_cluster(&engines, first_region)?; } - if store_id == 5 { - debug!("!!!!! read B22 {:?} {:?}", engines.kv.get_value_cf(engine_traits::CF_RAFT, "!!!ZZZ".as_bytes()), - engines.raft.get_raft_state(1)); - } // Put store only if the cluster is bootstrapped. info!("put store to PD"; "store" => ?&self.store); let status = self.pd_client.put_store(self.store.clone())?; @@ -436,13 +427,10 @@ where store_id: u64, ) -> Result> { if let Some(first_region) = engines.kv.get_msg(keys::PREPARE_BOOTSTRAP_KEY)? { - debug!("!!!!! check_or_prepare_bootstrap_cluster sid {}, A", store_id); Ok(Some(first_region)) } else if self.check_cluster_bootstrapped()? { - debug!("!!!!! check_or_prepare_bootstrap_cluster sid {}, B", store_id); Ok(None) } else { - debug!("!!!!! check_or_prepare_bootstrap_cluster sid {}, C", store_id); self.prepare_bootstrap_cluster(engines, store_id).map(Some) } } From 1e34cc987691809542589e12fe6bb73b5b4f3a10 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 24 Nov 2022 12:43:49 +0800 Subject: [PATCH 11/13] fmt Signed-off-by: CalvinNeo --- proxy_tests/proxy/region.rs | 120 +++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 56 deletions(-) diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index 42878435698..1819f112402 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -375,7 +375,7 @@ pub fn copy_meta_from( target_engines: &Engines, source: &Box, target: &mut Box, - region: kvproto::metapb::Region, + new_region_meta: kvproto::metapb::Region, ) -> raftstore::Result<()> { let region_id = source.region.get_id(); @@ -386,7 +386,7 @@ pub fn copy_meta_from( // region local state let mut state = RegionLocalState::default(); - state.set_region(region); + state.set_region(new_region_meta); box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &state)); // apply state @@ -435,6 +435,64 @@ pub fn copy_meta_from( Ok(()) } +fn recover_from_peer(cluster: &Cluster, from: u64, to: u64, region_id: u64) { + let source_region_1 = cluster + .ffi_helper_set + .lock() + .unwrap() + .get_mut(&from) + .unwrap() + .engine_store_server + .kvstore + .get(®ion_id) + .unwrap() + .clone(); + + let mut new_region_meta = source_region_1.region.clone(); + new_region_meta.mut_peers().push(new_learner_peer(to, to)); + + // Copy all node `from`'s data to node `to` + iter_ffi_helpers( + cluster, + Some(vec![to]), + &mut |id: u64, engine: &engine_rocks::RocksEngine, ffi: &mut FFIHelperSet| { + let server = &mut ffi.engine_store_server; + assert!(server.kvstore.get(®ion_id).is_none()); + + let new_region = make_new_region(Some(source_region_1.region.clone()), Some(id)); + server + .kvstore + .insert(source_region_1.region.get_id(), Box::new(new_region)); + if let Some(region) = server.kvstore.get_mut(®ion_id) { + for cf in 0..3 { + for (k, v) in &source_region_1.data[cf] { + write_kv_in_mem(region, cf, k.as_slice(), v.as_slice()); + } + } + let source_engines = cluster.get_engines(from); + let target_engines = cluster.get_engines(to); + copy_meta_from( + source_engines, + target_engines, + &source_region_1, + region, + new_region_meta.clone(), + ) + .unwrap(); + } else { + panic!("error"); + } + }, + ); + { + let prev_states = maybe_collect_states(cluster, region_id, None); + assert_eq!( + prev_states.get(&from).unwrap().in_disk_apply_state, + prev_states.get(&to).unwrap().in_disk_apply_state + ); + } +} + #[test] fn test_add_delayed_started_learner_snapshot() { // fail::cfg("before_tiflash_check_double_write", "return").unwrap(); @@ -464,7 +522,8 @@ fn test_add_delayed_started_learner_snapshot() { Some(vec![1, 2, 3, 4]), ); - // Force a compact log, so the leader have to send snapshot then. + // Force a compact log, so the leader have to send snapshot if peer 5 not catch + // up. let region = cluster.get_region(b"k1"); let region_id = region.get_id(); let prev_states = maybe_collect_states(&cluster, region_id, Some(vec![1, 2, 3])); @@ -483,61 +542,11 @@ fn test_add_delayed_started_learner_snapshot() { // Simulate 4 is lost, recover its data to node 5. cluster.stop_node(4); + later_bootstrap_learner_peer(&mut cluster, vec![5], 1); // After that, we manually compose data, to avoid snapshot sending. - let source_region_1 = cluster - .ffi_helper_set - .lock() - .unwrap() - .get_mut(&4) - .unwrap() - .engine_store_server - .kvstore - .get(&1) - .unwrap() - .clone(); - let mut new_region_meta = source_region_1.region.clone(); - new_region_meta.mut_peers().push(new_learner_peer(5, 5)); - // Copy all node 4's data to node 5 - iter_ffi_helpers( - &cluster, - Some(vec![5]), - &mut |id: u64, engine: &engine_rocks::RocksEngine, ffi: &mut FFIHelperSet| { - let server = &mut ffi.engine_store_server; - assert!(server.kvstore.get(®ion_id).is_none()); - let new_region = make_new_region(Some(source_region_1.region.clone()), Some(id)); - server - .kvstore - .insert(source_region_1.region.get_id(), Box::new(new_region)); - if let Some(region) = server.kvstore.get_mut(®ion_id) { - for cf in 0..3 { - for (k, v) in &source_region_1.data[cf] { - write_kv_in_mem(region, cf, k.as_slice(), v.as_slice()); - } - } - let source_engines = cluster.get_engines(4); - let target_engines = cluster.get_engines(5); - copy_meta_from( - source_engines, - target_engines, - &source_region_1, - region, - new_region_meta.clone(), - ) - .unwrap(); - } else { - panic!("error"); - } - }, - ); - { - let prev_states = maybe_collect_states(&cluster, region_id, None); - assert_eq!( - prev_states.get(&4).unwrap().in_disk_apply_state, - prev_states.get(&5).unwrap().in_disk_apply_state - ); - } + recover_from_peer(&cluster, 4, 5, 1); // Add node 5 to cluster. pd_client.must_add_peer(1, new_learner_peer(5, 5)); @@ -550,7 +559,6 @@ fn test_add_delayed_started_learner_snapshot() { )) .unwrap(); - cluster.must_put(b"z1", b"v1"); check_key( &cluster, From f68cd057feecd84d9e12507f25964ac5a0636098 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 25 Nov 2022 12:23:54 +0800 Subject: [PATCH 12/13] add snapshot sending condition Signed-off-by: CalvinNeo --- engine_store_ffi/src/observer.rs | 10 ++- engine_tiflash/src/proxy_utils.rs | 2 +- new-mock-engine-store/src/lib.rs | 31 ++++++- proxy_tests/proxy/proxy.rs | 21 ++++- proxy_tests/proxy/region.rs | 142 +++++++++++++++++++++++++----- 5 files changed, 180 insertions(+), 26 deletions(-) diff --git a/engine_store_ffi/src/observer.rs b/engine_store_ffi/src/observer.rs index a706a3caa84..19fb6337210 100644 --- a/engine_store_ffi/src/observer.rs +++ b/engine_store_ffi/src/observer.rs @@ -269,9 +269,13 @@ impl AdminObserver for TiFlashObserver { index, term, ) { - info!("can't flush data, should filter CompactLog"; - "region" => ?ob_ctx.region(), - "req" => ?req, + info!("can't flush data, filter CompactLog"; + "region_id" => ?ob_ctx.region().get_id(), + "region_epoch" => ?ob_ctx.region().get_region_epoch(), + "index" => index, + "term" => term, + "compact_index" => req.get_compact_log().get_compact_index(), + "compact_term" => req.get_compact_log().get_compact_term(), ); return true; } diff --git a/engine_tiflash/src/proxy_utils.rs b/engine_tiflash/src/proxy_utils.rs index 5682831b19c..c44e355ae59 100644 --- a/engine_tiflash/src/proxy_utils.rs +++ b/engine_tiflash/src/proxy_utils.rs @@ -33,7 +33,7 @@ fn cf_to_name(batch: &crate::RocksWriteBatchVec, cf: u32) -> &'static str { fn check_double_write(batch: &crate::RocksWriteBatchVec) { // It will fire if we write by both observer(compat_old_proxy is not enabled) // and TiKV's WriteBatch. - fail::fail_point!("before_tiflash_check_double_write", |_| { return }); + fail::fail_point!("before_tiflash_check_double_write", |_| {}); tikv_util::debug!("check if double write happens"); for wb in batch.wbs.iter() { for (_, cf, k, _) in wb.iter() { diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index a0bbcbab2f6..4dec9d67607 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -3,9 +3,13 @@ #![feature(slice_take)] use std::{ + cell::RefCell, collections::{BTreeMap, HashMap, HashSet}, pin::Pin, - sync::{atomic::Ordering, Mutex}, + sync::{ + atomic::{AtomicU64, Ordering}, + Mutex, + }, time::Duration, }; @@ -69,12 +73,18 @@ impl Region { } } +#[derive(Default)] +pub struct RegionStats { + pub pre_handle_count: AtomicU64, +} + pub struct EngineStoreServer { pub id: u64, pub engines: Option>, pub kvstore: HashMap>, pub proxy_compat: bool, pub mock_cfg: MockConfig, + pub region_states: RefCell>, } impl EngineStoreServer { @@ -88,7 +98,18 @@ impl EngineStoreServer { kvstore: Default::default(), proxy_compat: false, mock_cfg: MockConfig::default(), + region_states: RefCell::new(Default::default()), + } + } + + pub fn mutate_region_states(&self, region_id: RegionId, f: F) { + let has = self.region_states.borrow().contains_key(®ion_id); + if !has { + self.region_states + .borrow_mut() + .insert(region_id, Default::default()); } + f(self.region_states.borrow_mut().get_mut(®ion_id).unwrap()) } pub fn get_mem( @@ -1021,6 +1042,14 @@ unsafe extern "C" fn ffi_pre_handle_snapshot( "region" => ?region.region, "snap len" => snaps.len, ); + + (*store.engine_store_server).mutate_region_states( + region.region.get_id(), + |e: &mut RegionStats| { + e.pre_handle_count.fetch_add(1, Ordering::SeqCst); + }, + ); + for i in 0..snaps.len { let snapshot = snaps.views.add(i as usize); let view = &*(snapshot as *mut ffi_interfaces::SSTView); diff --git a/proxy_tests/proxy/proxy.rs b/proxy_tests/proxy/proxy.rs index d5a5aebdbcb..4a11b8a269b 100644 --- a/proxy_tests/proxy/proxy.rs +++ b/proxy_tests/proxy/proxy.rs @@ -1,14 +1,17 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use std::ops::RangeBounds; pub use std::{ collections::HashMap, io::Write, + iter::FromIterator, ops::DerefMut, path::{Path, PathBuf}, str::FromStr, sync::{atomic::Ordering, mpsc, Arc, RwLock}, }; +pub use collections::HashSet; pub use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; pub use engine_traits::{ MiscExt, Mutable, RaftEngineDebug, RaftLogBatch, WriteBatch, CF_DEFAULT, CF_LOCK, CF_WRITE, @@ -31,7 +34,7 @@ pub use new_mock_engine_store::{ transport_simulate::{ CloneFilterFactory, CollectSnapshotFilter, Direction, RegionPacketFilter, }, - write_kv_in_mem, Cluster, ProxyConfig, Simulator, TestPdClient, + write_kv_in_mem, Cluster, ProxyConfig, RegionStats, Simulator, TestPdClient, }; pub use raft::eraftpb::{ConfChangeType, MessageType}; pub use raftstore::coprocessor::ConsistencyCheckMethod; @@ -358,8 +361,24 @@ pub fn check_apply_state( } pub fn get_valid_compact_index(states: &HashMap) -> (u64, u64) { + get_valid_compact_index_by(states, None) +} + +pub fn get_valid_compact_index_by( + states: &HashMap, + use_nodes: Option>, +) -> (u64, u64) { + let set = use_nodes.map_or(None, |nodes| { + Some(HashSet::from_iter(nodes.clone().into_iter())) + }); states .iter() + .filter(|(k, _)| { + if let Some(ref s) = set { + return s.contains(k); + } + true + }) .map(|(_, s)| { ( s.in_memory_apply_state.get_applied_index(), diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index 1819f112402..e1193b199fb 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -398,8 +398,6 @@ pub fn copy_meta_from( .unwrap() .unwrap(); wb.put_msg_cf(CF_RAFT, &keys::apply_state_key(region_id), &apply_state)?; - wb.put_cf(CF_RAFT, "!!!ZZZ".as_bytes(), "VVVVV".as_bytes()) - .unwrap(); target.apply_state = apply_state.clone(); target.applied_term = source.applied_term; } @@ -493,8 +491,26 @@ fn recover_from_peer(cluster: &Cluster, from: u64, to: u64, region_ } } +fn force_compact_log( + cluster: &mut Cluster, + key: &[u8], + use_nodes: Option>, +) -> u64 { + let region = cluster.get_region(key); + let region_id = region.get_id(); + let prev_states = maybe_collect_states(&cluster, region_id, None); + + let (compact_index, compact_term) = get_valid_compact_index_by(&prev_states, use_nodes); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let _ = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + return compact_index; +} + #[test] -fn test_add_delayed_started_learner_snapshot() { +fn test_add_delayed_started_learner_no_snapshot() { // fail::cfg("before_tiflash_check_double_write", "return").unwrap(); // fail::cfg("before_tiflash_do_write", "return").unwrap(); let (mut cluster, pd_client) = new_later_add_learner_cluster( @@ -514,45 +530,32 @@ fn test_add_delayed_started_learner_snapshot() { must_put_and_check_key_with_generator( &mut cluster, - |i: u64| (format!("k{}", i), (0..10240).map(|_| "X").collect()), + |i: u64| (format!("k{}", i), (0..1024).map(|_| "X").collect()), 10, 20, Some(true), None, Some(vec![1, 2, 3, 4]), ); + cluster.must_transfer_leader(1, new_peer(1, 1)); // Force a compact log, so the leader have to send snapshot if peer 5 not catch // up. - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); - let prev_states = maybe_collect_states(&cluster, region_id, Some(vec![1, 2, 3])); { - let (compact_index, compact_term) = get_valid_compact_index(&prev_states); - assert!(compact_index > 15); - let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); - let req = - test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let _ = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); + assert!(force_compact_log(&mut cluster, b"k1", None) > 15); } - cluster.must_transfer_leader(1, new_peer(1, 1)); - // Simulate 4 is lost, recover its data to node 5. cluster.stop_node(4); later_bootstrap_learner_peer(&mut cluster, vec![5], 1); // After that, we manually compose data, to avoid snapshot sending. - recover_from_peer(&cluster, 4, 5, 1); - // Add node 5 to cluster. pd_client.must_add_peer(1, new_learner_peer(5, 5)); + fail::cfg("apply_on_handle_snapshot_finish_1_1", "panic").unwrap(); // Start store 5. - cluster .start_with(HashSet::from_iter( vec![0, 1, 2, 3].into_iter().map(|x| x as usize), @@ -587,7 +590,106 @@ fn test_add_delayed_started_learner_snapshot() { fail::remove("apply_on_handle_snapshot_finish_1_1"); fail::remove("on_pre_persist_with_finish"); + cluster.shutdown(); // fail::remove("before_tiflash_check_double_write"); // fail::remove("before_tiflash_do_write"); +} + +#[test] +fn test_add_delayed_started_learner_snapshot() { + let (mut cluster, pd_client) = new_later_add_learner_cluster( + |c: &mut Cluster| { + c.must_put(b"k1", b"v1"); + check_key(c, b"k1", b"v1", Some(true), None, Some(vec![1])); + }, + vec![4], + ); + + // Start Leader store 4. + cluster + .start_with(HashSet::from_iter( + vec![0, 1, 2, 4].into_iter().map(|x| x as usize), + )) + .unwrap(); + + must_put_and_check_key_with_generator( + &mut cluster, + |i: u64| (format!("k{}", i), (0..1024).map(|_| "X").collect()), + 10, + 20, + Some(true), + None, + Some(vec![1, 2, 3, 4]), + ); + cluster.must_transfer_leader(1, new_peer(1, 1)); + + // Simulate 4 is lost, recover its data to node 5. + cluster.stop_node(4); + + // Force a compact log, so the leader have to send snapshot if peer 5 not catch + // up. + { + must_put_and_check_key(&mut cluster, 21, 25, Some(true), None, Some(vec![1, 2, 3])); + let prev_states = maybe_collect_states(&cluster, 1, Some(vec![4])); + assert!( + force_compact_log(&mut cluster, b"k1", Some(vec![1, 2, 3])) + > prev_states + .get(&4) + .unwrap() + .in_disk_apply_state + .get_applied_index() + ); + } + + later_bootstrap_learner_peer(&mut cluster, vec![5], 1); + // After that, we manually compose data, to avoid snapshot sending. + recover_from_peer(&cluster, 4, 5, 1); + // Add node 5 to cluster. + pd_client.must_add_peer(1, new_learner_peer(5, 5)); + + // Start store 5. + cluster + .start_with(HashSet::from_iter( + vec![0, 1, 2, 3].into_iter().map(|x| x as usize), + )) + .unwrap(); + + cluster.must_put(b"z1", b"v1"); + check_key( + &cluster, + b"z1", + b"v1", + Some(true), + None, + Some(vec![1, 2, 3, 5]), + ); + + // Check if every node has the correct configuation. + let new_states = maybe_collect_states(&cluster, 1, Some(vec![1, 2, 3, 5])); + assert_eq!(new_states.len(), 4); + for i in new_states.keys() { + assert_eq!( + new_states + .get(i) + .unwrap() + .in_disk_region_state + .get_region() + .get_peers() + .len(), + 1 + 2 /* AddPeer */ + 2 // Learner + ); + } + + iter_ffi_helpers( + &cluster, + Some(vec![5]), + &mut |id: u64, engine: &engine_rocks::RocksEngine, ffi: &mut FFIHelperSet| { + (*ffi.engine_store_server).mutate_region_states(1, |e: &mut RegionStats| { + assert_eq!(e.pre_handle_count.load(Ordering::SeqCst), 1); + }); + }, + ); + + fail::remove("on_pre_persist_with_finish"); cluster.shutdown(); } From 7bf223d21f8e3bbfc3fcec983ece05a7beccda56 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 28 Nov 2022 12:13:59 +0800 Subject: [PATCH 13/13] add Signed-off-by: CalvinNeo --- proxy_server/src/config.rs | 17 +++++ proxy_server/src/run.rs | 110 +++++++++++++++++++++++++++- proxy_tests/proxy/region.rs | 139 +++++++++++++++++++----------------- 3 files changed, 197 insertions(+), 69 deletions(-) diff --git a/proxy_server/src/config.rs b/proxy_server/src/config.rs index 2908c08f9c2..fb7cf4f12f1 100644 --- a/proxy_server/src/config.rs +++ b/proxy_server/src/config.rs @@ -254,6 +254,22 @@ pub struct ProxyConfig { #[online_config(skip)] pub import: ImportConfig, + + #[online_config(skip)] + pub cloud: TiFlashCloudConfig, +} + +#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] +#[serde(default)] +#[serde(rename_all = "kebab-case")] +pub struct TiFlashCloudConfig { + pub new_store_id: u64, +} + +impl Default for TiFlashCloudConfig { + fn default() -> TiFlashCloudConfig { + TiFlashCloudConfig { new_store_id: 0 } + } } /// We use custom default, in case of later non-ordinary config items. @@ -269,6 +285,7 @@ impl Default for ProxyConfig { enable_io_snoop: false, readpool: ReadPoolConfig::default(), import: ImportConfig::default(), + cloud: TiFlashCloudConfig::default(), } } } diff --git a/proxy_server/src/run.rs b/proxy_server/src/run.rs index 14a3620465c..71f0a5d82a1 100644 --- a/proxy_server/src/run.rs +++ b/proxy_server/src/run.rs @@ -31,8 +31,8 @@ use engine_store_ffi::{ RaftStoreProxyFFI, RaftStoreProxyFFIHelper, ReadIndexClient, TiFlashEngine, }; use engine_traits::{ - CfOptionsExt, Engines, FlowControlFactorsExt, KvEngine, MiscExt, RaftEngine, TabletFactory, - CF_DEFAULT, CF_LOCK, CF_WRITE, + CfOptionsExt, Engines, FlowControlFactorsExt, Iterable, KvEngine, MiscExt, Peekable, + RaftEngine, TabletFactory, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, }; use error_code::ErrorCodeExt; use file_system::{ @@ -43,9 +43,13 @@ use futures::executor::block_on; use grpcio::{EnvBuilder, Environment}; use grpcio_health::HealthService; use kvproto::{ - debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst, + debugpb::create_debug, + diagnosticspb::create_diagnostics, + import_sstpb::create_import_sst, + raft_serverpb::{PeerState, RegionLocalState, StoreIdent}, }; use pd_client::{PdClient, RpcClient}; +use protobuf::Message; use raft_log_engine::RaftLogEngine; use raftstore::{ coprocessor::{config::SplitCheckConfigManager, CoprocessorHost, RegionInfoAccessor}, @@ -1131,6 +1135,12 @@ impl TiKvServer { panic!("engine address is empty"); } + { + let new_store_id = self.proxy_config.cloud.new_store_id; + let engines = self.engines.as_ref().unwrap().engines.clone(); + maybe_recover_from_crash(new_store_id, engines); + } + let mut node = Node::new( self.system.take().unwrap(), &server_config.value().clone(), @@ -1601,6 +1611,100 @@ impl TiKvServer { } } +pub fn maybe_recover_from_crash( + new_store_id: u64, + engines: Engines, +) -> raftstore::Result<()> { + if new_store_id == 0 { + return Ok(()); + } + let old_store_ident = match engines.kv.get_msg::(keys::STORE_IDENT_KEY)? { + None => { + error!( + "try recover from a non initialized store to {}, just quit", + new_store_id + ); + return Ok(()); + } + Some(e) => e, + }; + let old_store_id = old_store_ident.get_store_id(); + + warn!( + "start recover store from {} to {}", + old_store_id, new_store_id + ); + // For each regions + // TODO should request pd for current region. + let mut total_count = 0; + let mut dropped_epoch_count = 0; + let mut dropped_merging_count = 0; + let mut dropped_applying_count = 0; + let mut tombstone_count = 0; + + let mut reused_region: Vec = vec![]; + let mut pending_clean_region: Vec = vec![]; + + let start_key = keys::REGION_META_MIN_KEY; + let end_key = keys::REGION_META_MAX_KEY; + engines + .kv + .scan(CF_RAFT, start_key, end_key, false, |key, value| { + let (region_id, suffix) = box_try!(keys::decode_region_meta_key(key)); + if suffix != keys::REGION_STATE_SUFFIX { + return Ok(true); + } + + total_count += 1; + + let mut local_state = RegionLocalState::default(); + local_state.merge_from_bytes(value)?; + + let region = local_state.get_region(); + if local_state.get_state() == PeerState::Tombstone { + tombstone_count += 1; + return Ok(true); + } + if local_state.get_state() == PeerState::Applying { + dropped_applying_count += 1; + pending_clean_region.push(region_id); + return Ok(true); + } + if local_state.get_state() == PeerState::Merging { + dropped_merging_count += 1; + pending_clean_region.push(region_id); + return Ok(true); + } + // TODO add epoch check. + let old_epoch = region.get_region_epoch(); + + Ok(true) + })?; + + warn!("after scan"; + "total_count" => total_count, + "dropped_epoch_count" => dropped_epoch_count, + "dropped_merging_count" => dropped_merging_count, + "dropped_applying_count" => dropped_applying_count, + "tombstone_count" => tombstone_count, + "reused_count" => reused_region.len(), + "pending_clean_count" => pending_clean_region.len(), + ); + for region_id in reused_region { + // TODO Rewrite region peer infomation. + } + + for region_id in pending_clean_region { + // TODO clean all data, especially segment data in delta layer of this region. + let mut wb = engines.kv.write_batch(); + let mut raft_wb = engines.raft.log_batch(1024); + // TODO check if this work. + // raftstore::store::clear_meta(engines, &mut wb, &mut raft_wb, + // region_id, ); + } + + Ok(()) +} pub trait ConfiguredRaftEngine: RaftEngine { fn build( _: &TikvConfig, diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index e1193b199fb..4d9628657ab 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -6,6 +6,79 @@ use raft::eraftpb::Entry; use crate::proxy::*; +pub fn copy_meta_from_base( + source_engines: &Engines< + impl KvEngine, + impl RaftEngine + engine_traits::Peekable + engine_traits::RaftEngineDebug, + >, + target_engines: &Engines, + new_region_meta: kvproto::metapb::Region, +) -> raftstore::Result<()> { + let region_id = new_region_meta.get_id(); + + let mut wb = target_engines.kv.write_batch(); + let mut raft_wb = target_engines.raft.log_batch(1024); + + // Can't not set this key, otherwise will cause a bootstrap of cluster. + // box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, &source.region)); + + // region local state + let mut state = RegionLocalState::default(); + state.set_region(new_region_meta); + box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &state)); + + // apply state + { + let apply_state: RaftApplyState = + get_apply_state(&source_engines.kv, new_region_meta.get_id()); + wb.put_msg_cf(CF_RAFT, &keys::apply_state_key(region_id), &apply_state)?; + } + + wb.write()?; + target_engines.sync_kv()?; + + // raft state + { + let raft_state: RaftLocalState = + get_apply_state(&source_engines.raft, new_region_meta.get_id()); + raft_wb.put_raft_state(region_id, &raft_state)?; + }; + + // raft log + let mut entries: Vec = Default::default(); + source_engines + .raft + .scan_entries(region_id, |e| { + debug!("copy raft log"; "e" => ?e); + entries.push(e.clone()); + Ok(true) + }) + .unwrap(); + + raft_wb.append(region_id, entries)?; + box_try!(target_engines.raft.consume(&mut raft_wb, true)); + + Ok(()) +} + +pub fn copy_meta_from( + source_engines: &Engines< + impl KvEngine, + impl RaftEngine + engine_traits::Peekable + RaftEngineDebug, + >, + target_engines: &Engines, + source: &Box, + target: &mut Box, + new_region_meta: kvproto::metapb::Region, +) -> raftstore::Result<()> { + copy_meta_from_base(source_engines, target_engines, new_region_meta.clone()); + + let apply_state: RaftApplyState = get_apply_state(source_engines.kv, new_region_meta.get_id()); + target.apply_state = apply_state.clone(); + target.applied_term = source.applied_term; + Ok(()) +} + #[test] fn test_handle_destroy() { let (mut cluster, pd_client) = new_mock_cluster(0, 3); @@ -367,72 +440,6 @@ fn test_add_delayed_started_learner_by_joint() { cluster.shutdown(); } -pub fn copy_meta_from( - source_engines: &Engines< - impl KvEngine, - impl RaftEngine + engine_traits::Peekable + RaftEngineDebug, - >, - target_engines: &Engines, - source: &Box, - target: &mut Box, - new_region_meta: kvproto::metapb::Region, -) -> raftstore::Result<()> { - let region_id = source.region.get_id(); - - let mut wb = target_engines.kv.write_batch(); - let mut raft_wb = target_engines.raft.log_batch(1024); - - // box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, &source.region)); - - // region local state - let mut state = RegionLocalState::default(); - state.set_region(new_region_meta); - box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &state)); - - // apply state - { - let key = keys::apply_state_key(region_id); - let apply_state: RaftApplyState = source_engines - .kv - .get_msg_cf(CF_RAFT, &key) - .unwrap() - .unwrap(); - wb.put_msg_cf(CF_RAFT, &keys::apply_state_key(region_id), &apply_state)?; - target.apply_state = apply_state.clone(); - target.applied_term = source.applied_term; - } - - wb.write()?; - target_engines.sync_kv()?; - - // raft state - { - let key = keys::raft_state_key(region_id); - let raft_state = source_engines - .raft - .get_msg_cf(CF_DEFAULT, &key) - .unwrap() - .unwrap(); - raft_wb.put_raft_state(region_id, &raft_state)?; - }; - - // raft log - let mut entries: Vec = Default::default(); - source_engines - .raft - .scan_entries(region_id, |e| { - debug!("copy raft log"; "e" => ?e); - entries.push(e.clone()); - Ok(true) - }) - .unwrap(); - - raft_wb.append(region_id, entries)?; - box_try!(target_engines.raft.consume(&mut raft_wb, true)); - - Ok(()) -} - fn recover_from_peer(cluster: &Cluster, from: u64, to: u64, region_id: u64) { let source_region_1 = cluster .ffi_helper_set