diff --git a/proxy_server/src/config.rs b/proxy_server/src/config.rs index 2908c08f9c2..fb7cf4f12f1 100644 --- a/proxy_server/src/config.rs +++ b/proxy_server/src/config.rs @@ -254,6 +254,22 @@ pub struct ProxyConfig { #[online_config(skip)] pub import: ImportConfig, + + #[online_config(skip)] + pub cloud: TiFlashCloudConfig, +} + +#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] +#[serde(default)] +#[serde(rename_all = "kebab-case")] +pub struct TiFlashCloudConfig { + pub new_store_id: u64, +} + +impl Default for TiFlashCloudConfig { + fn default() -> TiFlashCloudConfig { + TiFlashCloudConfig { new_store_id: 0 } + } } /// We use custom default, in case of later non-ordinary config items. @@ -269,6 +285,7 @@ impl Default for ProxyConfig { enable_io_snoop: false, readpool: ReadPoolConfig::default(), import: ImportConfig::default(), + cloud: TiFlashCloudConfig::default(), } } } diff --git a/proxy_server/src/run.rs b/proxy_server/src/run.rs index 14a3620465c..71f0a5d82a1 100644 --- a/proxy_server/src/run.rs +++ b/proxy_server/src/run.rs @@ -31,8 +31,8 @@ use engine_store_ffi::{ RaftStoreProxyFFI, RaftStoreProxyFFIHelper, ReadIndexClient, TiFlashEngine, }; use engine_traits::{ - CfOptionsExt, Engines, FlowControlFactorsExt, KvEngine, MiscExt, RaftEngine, TabletFactory, - CF_DEFAULT, CF_LOCK, CF_WRITE, + CfOptionsExt, Engines, FlowControlFactorsExt, Iterable, KvEngine, MiscExt, Peekable, + RaftEngine, TabletFactory, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, }; use error_code::ErrorCodeExt; use file_system::{ @@ -43,9 +43,13 @@ use futures::executor::block_on; use grpcio::{EnvBuilder, Environment}; use grpcio_health::HealthService; use kvproto::{ - debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst, + debugpb::create_debug, + diagnosticspb::create_diagnostics, + import_sstpb::create_import_sst, + raft_serverpb::{PeerState, RegionLocalState, StoreIdent}, }; use pd_client::{PdClient, RpcClient}; +use protobuf::Message; use raft_log_engine::RaftLogEngine; use raftstore::{ coprocessor::{config::SplitCheckConfigManager, CoprocessorHost, RegionInfoAccessor}, @@ -1131,6 +1135,12 @@ impl TiKvServer { panic!("engine address is empty"); } + { + let new_store_id = self.proxy_config.cloud.new_store_id; + let engines = self.engines.as_ref().unwrap().engines.clone(); + maybe_recover_from_crash(new_store_id, engines); + } + let mut node = Node::new( self.system.take().unwrap(), &server_config.value().clone(), @@ -1601,6 +1611,100 @@ impl TiKvServer { } } +pub fn maybe_recover_from_crash( + new_store_id: u64, + engines: Engines, +) -> raftstore::Result<()> { + if new_store_id == 0 { + return Ok(()); + } + let old_store_ident = match engines.kv.get_msg::(keys::STORE_IDENT_KEY)? { + None => { + error!( + "try recover from a non initialized store to {}, just quit", + new_store_id + ); + return Ok(()); + } + Some(e) => e, + }; + let old_store_id = old_store_ident.get_store_id(); + + warn!( + "start recover store from {} to {}", + old_store_id, new_store_id + ); + // For each regions + // TODO should request pd for current region. + let mut total_count = 0; + let mut dropped_epoch_count = 0; + let mut dropped_merging_count = 0; + let mut dropped_applying_count = 0; + let mut tombstone_count = 0; + + let mut reused_region: Vec = vec![]; + let mut pending_clean_region: Vec = vec![]; + + let start_key = keys::REGION_META_MIN_KEY; + let end_key = keys::REGION_META_MAX_KEY; + engines + .kv + .scan(CF_RAFT, start_key, end_key, false, |key, value| { + let (region_id, suffix) = box_try!(keys::decode_region_meta_key(key)); + if suffix != keys::REGION_STATE_SUFFIX { + return Ok(true); + } + + total_count += 1; + + let mut local_state = RegionLocalState::default(); + local_state.merge_from_bytes(value)?; + + let region = local_state.get_region(); + if local_state.get_state() == PeerState::Tombstone { + tombstone_count += 1; + return Ok(true); + } + if local_state.get_state() == PeerState::Applying { + dropped_applying_count += 1; + pending_clean_region.push(region_id); + return Ok(true); + } + if local_state.get_state() == PeerState::Merging { + dropped_merging_count += 1; + pending_clean_region.push(region_id); + return Ok(true); + } + // TODO add epoch check. + let old_epoch = region.get_region_epoch(); + + Ok(true) + })?; + + warn!("after scan"; + "total_count" => total_count, + "dropped_epoch_count" => dropped_epoch_count, + "dropped_merging_count" => dropped_merging_count, + "dropped_applying_count" => dropped_applying_count, + "tombstone_count" => tombstone_count, + "reused_count" => reused_region.len(), + "pending_clean_count" => pending_clean_region.len(), + ); + for region_id in reused_region { + // TODO Rewrite region peer infomation. + } + + for region_id in pending_clean_region { + // TODO clean all data, especially segment data in delta layer of this region. + let mut wb = engines.kv.write_batch(); + let mut raft_wb = engines.raft.log_batch(1024); + // TODO check if this work. + // raftstore::store::clear_meta(engines, &mut wb, &mut raft_wb, + // region_id, ); + } + + Ok(()) +} pub trait ConfiguredRaftEngine: RaftEngine { fn build( _: &TikvConfig, diff --git a/proxy_tests/proxy/region.rs b/proxy_tests/proxy/region.rs index e1193b199fb..4d9628657ab 100644 --- a/proxy_tests/proxy/region.rs +++ b/proxy_tests/proxy/region.rs @@ -6,6 +6,79 @@ use raft::eraftpb::Entry; use crate::proxy::*; +pub fn copy_meta_from_base( + source_engines: &Engines< + impl KvEngine, + impl RaftEngine + engine_traits::Peekable + engine_traits::RaftEngineDebug, + >, + target_engines: &Engines, + new_region_meta: kvproto::metapb::Region, +) -> raftstore::Result<()> { + let region_id = new_region_meta.get_id(); + + let mut wb = target_engines.kv.write_batch(); + let mut raft_wb = target_engines.raft.log_batch(1024); + + // Can't not set this key, otherwise will cause a bootstrap of cluster. + // box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, &source.region)); + + // region local state + let mut state = RegionLocalState::default(); + state.set_region(new_region_meta); + box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &state)); + + // apply state + { + let apply_state: RaftApplyState = + get_apply_state(&source_engines.kv, new_region_meta.get_id()); + wb.put_msg_cf(CF_RAFT, &keys::apply_state_key(region_id), &apply_state)?; + } + + wb.write()?; + target_engines.sync_kv()?; + + // raft state + { + let raft_state: RaftLocalState = + get_apply_state(&source_engines.raft, new_region_meta.get_id()); + raft_wb.put_raft_state(region_id, &raft_state)?; + }; + + // raft log + let mut entries: Vec = Default::default(); + source_engines + .raft + .scan_entries(region_id, |e| { + debug!("copy raft log"; "e" => ?e); + entries.push(e.clone()); + Ok(true) + }) + .unwrap(); + + raft_wb.append(region_id, entries)?; + box_try!(target_engines.raft.consume(&mut raft_wb, true)); + + Ok(()) +} + +pub fn copy_meta_from( + source_engines: &Engines< + impl KvEngine, + impl RaftEngine + engine_traits::Peekable + RaftEngineDebug, + >, + target_engines: &Engines, + source: &Box, + target: &mut Box, + new_region_meta: kvproto::metapb::Region, +) -> raftstore::Result<()> { + copy_meta_from_base(source_engines, target_engines, new_region_meta.clone()); + + let apply_state: RaftApplyState = get_apply_state(source_engines.kv, new_region_meta.get_id()); + target.apply_state = apply_state.clone(); + target.applied_term = source.applied_term; + Ok(()) +} + #[test] fn test_handle_destroy() { let (mut cluster, pd_client) = new_mock_cluster(0, 3); @@ -367,72 +440,6 @@ fn test_add_delayed_started_learner_by_joint() { cluster.shutdown(); } -pub fn copy_meta_from( - source_engines: &Engines< - impl KvEngine, - impl RaftEngine + engine_traits::Peekable + RaftEngineDebug, - >, - target_engines: &Engines, - source: &Box, - target: &mut Box, - new_region_meta: kvproto::metapb::Region, -) -> raftstore::Result<()> { - let region_id = source.region.get_id(); - - let mut wb = target_engines.kv.write_batch(); - let mut raft_wb = target_engines.raft.log_batch(1024); - - // box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, &source.region)); - - // region local state - let mut state = RegionLocalState::default(); - state.set_region(new_region_meta); - box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &state)); - - // apply state - { - let key = keys::apply_state_key(region_id); - let apply_state: RaftApplyState = source_engines - .kv - .get_msg_cf(CF_RAFT, &key) - .unwrap() - .unwrap(); - wb.put_msg_cf(CF_RAFT, &keys::apply_state_key(region_id), &apply_state)?; - target.apply_state = apply_state.clone(); - target.applied_term = source.applied_term; - } - - wb.write()?; - target_engines.sync_kv()?; - - // raft state - { - let key = keys::raft_state_key(region_id); - let raft_state = source_engines - .raft - .get_msg_cf(CF_DEFAULT, &key) - .unwrap() - .unwrap(); - raft_wb.put_raft_state(region_id, &raft_state)?; - }; - - // raft log - let mut entries: Vec = Default::default(); - source_engines - .raft - .scan_entries(region_id, |e| { - debug!("copy raft log"; "e" => ?e); - entries.push(e.clone()); - Ok(true) - }) - .unwrap(); - - raft_wb.append(region_id, entries)?; - box_try!(target_engines.raft.consume(&mut raft_wb, true)); - - Ok(()) -} - fn recover_from_peer(cluster: &Cluster, from: u64, to: u64, region_id: u64) { let source_region_1 = cluster .ffi_helper_set