Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Eval EXEC <execvy@gmail.com>
  • Loading branch information
eval-exec committed Dec 14, 2024
1 parent a12cd96 commit 80d8b6c
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 19 deletions.
26 changes: 21 additions & 5 deletions sync/src/synchronizer/get_headers_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,33 @@ impl<'a> GetHeadersProcess<'a> {
);

self.synchronizer.peers().getheaders_received(self.peer);
let headers_vec: Vec<Vec<core::HeaderView>> =
active_chain.get_locator_responses(block_number, &hash_stop);
// response headers

debug!("headers len={}", headers_vec.len());
for headers in headers_vec {
let hash_size: packed::Uint32 = 20_u32.pack();
let length_20_for_test = packed::Byte32::new_unchecked(hash_size.as_bytes());
if hash_stop.eq(&length_20_for_test) {
let headers: Vec<core::HeaderView> =
active_chain.get_locator_response(block_number, &hash_stop);
// response headers

debug!("headers len={}", headers.len());
let content = packed::SendHeaders::new_builder()
.headers(headers.into_iter().map(|x| x.data()).pack())
.build();
let message = packed::SyncMessage::new_builder().set(content).build();
attempt!(send_message_to(self.nc, self.peer, &message));
} else {
let headers_vec: Vec<Vec<core::HeaderView>> =
active_chain.get_locator_responses(block_number, &hash_stop);
// response headers

debug!("headers vec len={}", headers_vec.len());
for headers in headers_vec {
let content = packed::SendHeaders::new_builder()
.headers(headers.into_iter().map(|x| x.data()).pack())
.build();
let message = packed::SyncMessage::new_builder().set(content).build();
attempt!(send_message_to(self.nc, self.peer, &message));
}
}
} else {
return StatusCode::GetHeadersMissCommonAncestors
Expand Down
27 changes: 19 additions & 8 deletions sync/src/synchronizer/headers_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ impl<'a> HeadersProcess<'a> {
true
}

fn is_parent_exists(&self, first_header: &core:HeaderView) -> bool {
fn is_parent_exists(&self, first_header: &core::HeaderView) -> bool {
let shared: &SyncShared = self.synchronizer.shared();
shared.get_header_fields(first_header.parent_hash).is_some()
shared
.get_header_fields(&first_header.parent_hash())
.is_some()
}


pub fn accept_first(&self, first: &core::HeaderView) -> ValidationResult {
let shared: &SyncShared = self.synchronizer.shared();
let verifier = HeaderVerifier::new(shared, shared.consensus());
Expand Down Expand Up @@ -98,15 +99,19 @@ impl<'a> HeadersProcess<'a> {

pub fn execute(self) -> Status {
debug!("HeadersProcess begins");
let shared: &SyncShared = self.synchronizer.shared();
let consensus = shared.consensus();
let headers = self
.message
.headers()
.to_entity()
.into_iter()
.map(packed::Header::into_view)
.collect::<Vec<_>>();
self.execute_inner(headers)
}

fn execute_inner(self, headers: Vec<core::HeaderView>) -> Status {
let shared: &SyncShared = self.synchronizer.shared();
let consensus = shared.consensus();

if headers.len() > MAX_HEADERS_LEN {
warn!("HeadersProcess is oversized");
Expand Down Expand Up @@ -136,7 +141,9 @@ impl<'a> HeadersProcess<'a> {

if !self.is_parent_exists(&headers[0]) {
// put the headers into a memory cache
self.synchronizer.header_cache.insert(headers[0].parent_hash, headers);
self.synchronizer
.header_cache
.insert(headers[0].parent_hash(), headers);
// verify them later
return Status::ok();
}
Expand Down Expand Up @@ -226,8 +233,12 @@ impl<'a> HeadersProcess<'a> {
{
// these headers verify success
// may the headers's tail header_hash exist in headers_cahce?
if let Some(headers) = self.synchronizer.headers_cache.get(headers.last().expect("last header must exist").hash){
HeadersProcess::new().execute();
if let Some(headers) = self
.synchronizer
.header_cache
.get(&headers.last().expect("last header must exist").hash())
{
return self.execute_inner(headers.to_owned());
}
}

Expand Down
6 changes: 5 additions & 1 deletion sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ use ckb_systemtime::unix_time_as_millis;

#[cfg(test)]
use ckb_types::core;
use ckb_types::core::HeaderView;
use ckb_types::{
core::BlockNumber,
packed::{self, Byte32},
prelude::*,
};
use dashmap::DashMap;
use std::{
collections::HashSet,
sync::{atomic::Ordering, Arc},
Expand Down Expand Up @@ -303,7 +305,7 @@ pub struct Synchronizer {
pub shared: Arc<SyncShared>,

// First Headers's parent_hash -> Headers
pub(crate) header_cache: HashMap<Byte32, Vec<Header>>,
pub(crate) header_cache: DashMap<Byte32, Vec<HeaderView>>,
fetch_channel: Option<channel::Sender<FetchCMD>>,
}

Expand All @@ -312,10 +314,12 @@ impl Synchronizer {
///
/// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it
pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
let header_cache = DashMap::new();
Synchronizer {
chain,
shared,
fetch_channel: None,
header_cache,
}
}

Expand Down
17 changes: 12 additions & 5 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1912,11 +1912,16 @@ impl ActiveChain {
pub fn get_locator_responses(
&self,
block_number: BlockNumber,
hash_stop: &Byte32,
_hash_stop: &Byte32,
) -> Vec<Vec<core::HeaderView>> {
(0..32).iter().map(|index| {
get_locator_response(block_number + (i * MAX_HEADERS_LEN), &Byte32::default())
}).collect();
(0..32)
.map(|index| {
self.get_locator_response(
block_number + (index as u64 * MAX_HEADERS_LEN as u64),
&Byte32::default(),
)
})
.collect()
}

pub fn send_getheaders_to_peer(
Expand Down Expand Up @@ -1955,9 +1960,11 @@ impl ActiveChain {
block_number_and_hash.hash()
);
let locator_hash = self.get_locator(block_number_and_hash);
let hash_size: packed::Uint32 = 20_u32.pack();
let length_20_for_test = packed::Byte32::new_unchecked(hash_size.as_bytes());
let content = packed::GetHeaders::new_builder()
.block_locator_hashes(locator_hash.pack())
.hash_stop(packed::Byte32::zero())
.hash_stop(length_20_for_test)
.build();
let message = packed::SyncMessage::new_builder().set(content).build();
let _status = send_message(SupportProtocols::Sync.protocol_id(), nc, peer, &message);
Expand Down

0 comments on commit 80d8b6c

Please sign in to comment.