Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic implementation of recovery from s3 #223

Open
wants to merge 14 commits into
base: raftstore-proxy
Choose a base branch
from
17 changes: 17 additions & 0 deletions proxy_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,22 @@ pub struct ProxyConfig {

#[online_config(skip)]
pub import: ImportConfig,

#[online_config(skip)]
pub cloud: TiFlashCloudConfig,
}

#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct TiFlashCloudConfig {
pub new_store_id: u64,
}

impl Default for TiFlashCloudConfig {
fn default() -> TiFlashCloudConfig {
TiFlashCloudConfig { new_store_id: 0 }
}
}

/// We use custom default, in case of later non-ordinary config items.
Expand All @@ -269,6 +285,7 @@ impl Default for ProxyConfig {
enable_io_snoop: false,
readpool: ReadPoolConfig::default(),
import: ImportConfig::default(),
cloud: TiFlashCloudConfig::default(),
}
}
}
Expand Down
110 changes: 107 additions & 3 deletions proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use engine_store_ffi::{
RaftStoreProxyFFI, RaftStoreProxyFFIHelper, ReadIndexClient, TiFlashEngine,
};
use engine_traits::{
CfOptionsExt, Engines, FlowControlFactorsExt, KvEngine, MiscExt, RaftEngine, TabletFactory,
CF_DEFAULT, CF_LOCK, CF_WRITE,
CfOptionsExt, Engines, FlowControlFactorsExt, Iterable, KvEngine, MiscExt, Peekable,
RaftEngine, TabletFactory, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
};
use error_code::ErrorCodeExt;
use file_system::{
Expand All @@ -43,9 +43,13 @@ use futures::executor::block_on;
use grpcio::{EnvBuilder, Environment};
use grpcio_health::HealthService;
use kvproto::{
debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst,
debugpb::create_debug,
diagnosticspb::create_diagnostics,
import_sstpb::create_import_sst,
raft_serverpb::{PeerState, RegionLocalState, StoreIdent},
};
use pd_client::{PdClient, RpcClient};
use protobuf::Message;
use raft_log_engine::RaftLogEngine;
use raftstore::{
coprocessor::{config::SplitCheckConfigManager, CoprocessorHost, RegionInfoAccessor},
Expand Down Expand Up @@ -1131,6 +1135,12 @@ impl<ER: RaftEngine> TiKvServer<ER> {
panic!("engine address is empty");
}

{
let new_store_id = self.proxy_config.cloud.new_store_id;
let engines = self.engines.as_ref().unwrap().engines.clone();
maybe_recover_from_crash(new_store_id, engines);
}

let mut node = Node::new(
self.system.take().unwrap(),
&server_config.value().clone(),
Expand Down Expand Up @@ -1601,6 +1611,100 @@ impl<ER: RaftEngine> TiKvServer<ER> {
}
}

pub fn maybe_recover_from_crash<EK: KvEngine, ER: RaftEngine>(
new_store_id: u64,
engines: Engines<EK, ER>,
) -> raftstore::Result<()> {
if new_store_id == 0 {
return Ok(());
}
let old_store_ident = match engines.kv.get_msg::<StoreIdent>(keys::STORE_IDENT_KEY)? {
None => {
error!(
"try recover from a non initialized store to {}, just quit",
new_store_id
);
return Ok(());
}
Some(e) => e,
};
let old_store_id = old_store_ident.get_store_id();

warn!(
"start recover store from {} to {}",
old_store_id, new_store_id
);
// For each regions
// TODO should request pd for current region.
let mut total_count = 0;
let mut dropped_epoch_count = 0;
let mut dropped_merging_count = 0;
let mut dropped_applying_count = 0;
let mut tombstone_count = 0;

let mut reused_region: Vec<u64> = vec![];
let mut pending_clean_region: Vec<u64> = vec![];

let start_key = keys::REGION_META_MIN_KEY;
let end_key = keys::REGION_META_MAX_KEY;
engines
.kv
.scan(CF_RAFT, start_key, end_key, false, |key, value| {
let (region_id, suffix) = box_try!(keys::decode_region_meta_key(key));
if suffix != keys::REGION_STATE_SUFFIX {
return Ok(true);
}

total_count += 1;

let mut local_state = RegionLocalState::default();
local_state.merge_from_bytes(value)?;

let region = local_state.get_region();
if local_state.get_state() == PeerState::Tombstone {
tombstone_count += 1;
return Ok(true);
}
if local_state.get_state() == PeerState::Applying {
dropped_applying_count += 1;
pending_clean_region.push(region_id);
return Ok(true);
}
if local_state.get_state() == PeerState::Merging {
dropped_merging_count += 1;
pending_clean_region.push(region_id);
return Ok(true);
}
// TODO add epoch check.
let old_epoch = region.get_region_epoch();

Ok(true)
})?;

warn!("after scan";
"total_count" => total_count,
"dropped_epoch_count" => dropped_epoch_count,
"dropped_merging_count" => dropped_merging_count,
"dropped_applying_count" => dropped_applying_count,
"tombstone_count" => tombstone_count,
"reused_count" => reused_region.len(),
"pending_clean_count" => pending_clean_region.len(),
);
for region_id in reused_region {
// TODO Rewrite region peer infomation.
}

for region_id in pending_clean_region {
// TODO clean all data, especially segment data in delta layer of this region.
let mut wb = engines.kv.write_batch();
let mut raft_wb = engines.raft.log_batch(1024);
// TODO check if this work.
// raftstore::store::clear_meta(engines, &mut wb, &mut raft_wb,
// region_id, );
}

Ok(())
}
pub trait ConfiguredRaftEngine: RaftEngine {
fn build(
_: &TikvConfig,
Expand Down
139 changes: 73 additions & 66 deletions proxy_tests/proxy/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,79 @@ use raft::eraftpb::Entry;

use crate::proxy::*;

pub fn copy_meta_from_base(
source_engines: &Engines<
impl KvEngine,
impl RaftEngine + engine_traits::Peekable + engine_traits::RaftEngineDebug,
>,
target_engines: &Engines<impl KvEngine, impl RaftEngine>,
new_region_meta: kvproto::metapb::Region,
) -> raftstore::Result<()> {
let region_id = new_region_meta.get_id();

let mut wb = target_engines.kv.write_batch();
let mut raft_wb = target_engines.raft.log_batch(1024);

// Can't not set this key, otherwise will cause a bootstrap of cluster.
// box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, &source.region));

// region local state
let mut state = RegionLocalState::default();
state.set_region(new_region_meta);
box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &state));

// apply state
{
let apply_state: RaftApplyState =
get_apply_state(&source_engines.kv, new_region_meta.get_id());
wb.put_msg_cf(CF_RAFT, &keys::apply_state_key(region_id), &apply_state)?;
}

wb.write()?;
target_engines.sync_kv()?;

// raft state
{
let raft_state: RaftLocalState =
get_apply_state(&source_engines.raft, new_region_meta.get_id());
raft_wb.put_raft_state(region_id, &raft_state)?;
};

// raft log
let mut entries: Vec<raft::eraftpb::Entry> = Default::default();
source_engines
.raft
.scan_entries(region_id, |e| {
debug!("copy raft log"; "e" => ?e);
entries.push(e.clone());
Ok(true)
})
.unwrap();

raft_wb.append(region_id, entries)?;
box_try!(target_engines.raft.consume(&mut raft_wb, true));

Ok(())
}

pub fn copy_meta_from(
source_engines: &Engines<
impl KvEngine,
impl RaftEngine + engine_traits::Peekable + RaftEngineDebug,
>,
target_engines: &Engines<impl KvEngine, impl RaftEngine>,
source: &Box<new_mock_engine_store::Region>,
target: &mut Box<new_mock_engine_store::Region>,
new_region_meta: kvproto::metapb::Region,
) -> raftstore::Result<()> {
copy_meta_from_base(source_engines, target_engines, new_region_meta.clone());

let apply_state: RaftApplyState = get_apply_state(source_engines.kv, new_region_meta.get_id());
target.apply_state = apply_state.clone();
target.applied_term = source.applied_term;
Ok(())
}

#[test]
fn test_handle_destroy() {
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
Expand Down Expand Up @@ -367,72 +440,6 @@ fn test_add_delayed_started_learner_by_joint() {
cluster.shutdown();
}

pub fn copy_meta_from(
source_engines: &Engines<
impl KvEngine,
impl RaftEngine + engine_traits::Peekable + RaftEngineDebug,
>,
target_engines: &Engines<impl KvEngine, impl RaftEngine>,
source: &Box<new_mock_engine_store::Region>,
target: &mut Box<new_mock_engine_store::Region>,
new_region_meta: kvproto::metapb::Region,
) -> raftstore::Result<()> {
let region_id = source.region.get_id();

let mut wb = target_engines.kv.write_batch();
let mut raft_wb = target_engines.raft.log_batch(1024);

// box_try!(wb.put_msg(keys::PREPARE_BOOTSTRAP_KEY, &source.region));

// region local state
let mut state = RegionLocalState::default();
state.set_region(new_region_meta);
box_try!(wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &state));

// apply state
{
let key = keys::apply_state_key(region_id);
let apply_state: RaftApplyState = source_engines
.kv
.get_msg_cf(CF_RAFT, &key)
.unwrap()
.unwrap();
wb.put_msg_cf(CF_RAFT, &keys::apply_state_key(region_id), &apply_state)?;
target.apply_state = apply_state.clone();
target.applied_term = source.applied_term;
}

wb.write()?;
target_engines.sync_kv()?;

// raft state
{
let key = keys::raft_state_key(region_id);
let raft_state = source_engines
.raft
.get_msg_cf(CF_DEFAULT, &key)
.unwrap()
.unwrap();
raft_wb.put_raft_state(region_id, &raft_state)?;
};

// raft log
let mut entries: Vec<Entry> = Default::default();
source_engines
.raft
.scan_entries(region_id, |e| {
debug!("copy raft log"; "e" => ?e);
entries.push(e.clone());
Ok(true)
})
.unwrap();

raft_wb.append(region_id, entries)?;
box_try!(target_engines.raft.consume(&mut raft_wb, true));

Ok(())
}

fn recover_from_peer(cluster: &Cluster<NodeCluster>, from: u64, to: u64, region_id: u64) {
let source_region_1 = cluster
.ffi_helper_set
Expand Down