Skip to content

Commit 441a5e0

Browse files
authored
Remove dag sync blocks (#4120)
* add delete sync blocks * fix fmt and clippy * return error directly and prolong the duration for reconnection time in reconnect_after_disconnect
1 parent 3180971 commit 441a5e0

File tree

16 files changed

+188
-380
lines changed

16 files changed

+188
-380
lines changed

chain/service/src/chain_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,9 +461,9 @@ impl ReadableChainService for ChainReaderServiceInner {
461461
let head = self.main.current_header();
462462
if self.main.check_dag_type(&head)? != DagHeaderType::Normal {
463463
bail!(
464-
"The chain is still not a dag and its dag fork number is {} and the current is {:?}.",
464+
"The chain is still not a dag and its dag fork number is {:?} and the current block's header number is {:?}.",
465+
self.main.dag_fork_height()?,
465466
head.number(),
466-
self.main.dag_fork_height()?
467467
);
468468
}
469469
let (dag_genesis, state) = self.main.get_dag_state_by_block(&head)?;

flexidag/tests/tests.rs

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -348,42 +348,6 @@ fn test_dag_tips_store() {
348348
);
349349
}
350350

351-
// #[test]
352-
// fn test_dag_multiple_commits() {
353-
// // initialzie the dag firstly
354-
// let dag = BlockDAG::create_for_testing().unwrap();
355-
356-
// let genesis = BlockHeader::dag_genesis_random()
357-
// .as_builder()
358-
// .with_difficulty(0.into())
359-
// .build();
360-
// dag.init_with_genesis(genesis.clone()).unwrap();
361-
362-
// // normally add the dag blocks
363-
// let mut headers = vec![];
364-
// let mut parents_hash = vec![genesis.id()];
365-
// let mut parent_hash = genesis.id();
366-
// for _ in 0..100 {
367-
// let header_builder = BlockHeaderBuilder::random();
368-
// let header = header_builder
369-
// .with_parent_hash(parent_hash)
370-
// .with_parents_hash(Some(parents_hash.clone()))
371-
// .build();
372-
// parents_hash = vec![header.id()];
373-
// parent_hash = header.id();
374-
// headers.push(header.clone());
375-
// dag.commit(header.to_owned()).unwrap();
376-
// let ghostdata = dag.ghostdata_by_hash(header.id()).unwrap().unwrap();
377-
// }
378-
379-
// for _ in 0..10 {
380-
// for header in &headers {
381-
// let _ = dag.commit(header.clone());
382-
// let _ = dag.ghostdata_by_hash(header.id()).unwrap().unwrap();
383-
// }
384-
// }
385-
// }
386-
387351
#[test]
388352
fn test_dag_multiple_commits() -> anyhow::Result<()> {
389353
set_test_flexidag_fork_height(1);
@@ -746,33 +710,11 @@ fn test_reachability_algorighm() -> anyhow::Result<()> {
746710
hashes.push(child8);
747711
print_reachability_data(reachability_store.read().deref(), &hashes);
748712

749-
// for _i in 7..=31 {
750-
// let s = Hash::random();
751-
// inquirer::add_block(
752-
// &mut reachability_store,
753-
// s,
754-
// child1,
755-
// &mut vec![child1].into_iter(),
756-
// )?;
757-
// hashes.push(s);
758-
// print_reachability_data(&reachability_store, &hashes);
759-
// }
760-
761713
assert!(
762714
dag.check_ancestor_of(origin, vec![child5])?,
763715
"child 5 must be origin's child"
764716
);
765717

766-
// let mut count = 6;
767-
// loop {
768-
// let child = Hash::random();
769-
// inquirer::add_block(&mut reachability_store, child, origin, &mut vec![origin].into_iter())?;
770-
// hashes.push(child);
771-
// print!("{count:?}");
772-
// print_reachability_data(&reachability_store, &hashes);
773-
// count += 1;
774-
// }
775-
776718
Ok(())
777719
}
778720

network-p2p/src/protocol/generic_proto/tests.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
use crate::protocol::generic_proto::{GenericProto, GenericProtoOut};
2222

23+
use anyhow::{bail, format_err, Ok};
2324
use futures::prelude::*;
2425
use libp2p::core::{connection::ConnectionId, transport::MemoryTransport, upgrade};
2526
use libp2p::swarm::behaviour::FromSwarm;
@@ -179,7 +180,7 @@ impl NetworkBehaviour for CustomProtoWithAddr {
179180
}
180181

181182
#[test]
182-
fn reconnect_after_disconnect() {
183+
fn reconnect_after_disconnect() -> anyhow::Result<()> {
183184
// We connect two nodes together, then force a disconnect (through the API of the `Service`),
184185
// check that the disconnect worked, and finally check whether they successfully reconnect.
185186

@@ -223,15 +224,15 @@ fn reconnect_after_disconnect() {
223224
}
224225
}
225226
ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain,
226-
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
227+
ServiceState::FirstConnec | ServiceState::ConnectedAgain => bail!("unexpected"),
227228
},
228229
future::Either::Left(SwarmEvent::Behaviour(
229230
GenericProtoOut::CustomProtocolClosed { .. },
230231
)) => match service1_state {
231232
ServiceState::FirstConnec => service1_state = ServiceState::Disconnected,
232233
ServiceState::ConnectedAgain
233234
| ServiceState::NotConnected
234-
| ServiceState::Disconnected => panic!(),
235+
| ServiceState::Disconnected => bail!("unexpected"),
235236
},
236237
future::Either::Right(SwarmEvent::Behaviour(
237238
GenericProtoOut::CustomProtocolOpen { .. },
@@ -246,15 +247,15 @@ fn reconnect_after_disconnect() {
246247
}
247248
}
248249
ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain,
249-
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
250+
ServiceState::FirstConnec | ServiceState::ConnectedAgain => bail!("unexpected"),
250251
},
251252
future::Either::Right(SwarmEvent::Behaviour(
252253
GenericProtoOut::CustomProtocolClosed { .. },
253254
)) => match service2_state {
254255
ServiceState::FirstConnec => service2_state = ServiceState::Disconnected,
255256
ServiceState::ConnectedAgain
256257
| ServiceState::NotConnected
257-
| ServiceState::Disconnected => panic!(),
258+
| ServiceState::Disconnected => bail!("unexpected"),
258259
},
259260
_ => {}
260261
}
@@ -268,7 +269,7 @@ fn reconnect_after_disconnect() {
268269

269270
// Now that the two services have disconnected and reconnected, wait for 3 seconds and
270271
// check whether they're still connected.
271-
let mut delay = futures_timer::Delay::new(Duration::from_secs(3));
272+
let mut delay = futures_timer::Delay::new(Duration::from_secs(20));
272273

273274
loop {
274275
// Grab next event from services.
@@ -285,9 +286,16 @@ fn reconnect_after_disconnect() {
285286

286287
match event {
287288
SwarmEvent::Behaviour(GenericProtoOut::CustomProtocolOpen { .. })
288-
| SwarmEvent::Behaviour(GenericProtoOut::CustomProtocolClosed { .. }) => panic!(),
289+
| SwarmEvent::Behaviour(GenericProtoOut::CustomProtocolClosed { .. }) => {
290+
bail!("unexpected event: {:?}", event)
291+
}
289292
_ => {}
290293
}
291294
}
292-
});
295+
296+
anyhow::Result::Ok(())
297+
})
298+
.map_err(|e| format_err!("{:?}", e))?;
299+
300+
Ok(())
293301
}

network/tests/network_service_test.rs

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) The Starcoin Core Contributors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use anyhow::anyhow;
4+
use anyhow::{anyhow, bail, format_err, Ok};
55
use futures::stream::StreamExt;
66
use futures_timer::Delay;
77
use network_api::messages::{
@@ -140,7 +140,7 @@ async fn test_connected_nodes() {
140140
}
141141

142142
#[stest::test]
143-
async fn test_event_notify_receive() {
143+
async fn test_event_notify_receive() -> anyhow::Result<()> {
144144
let (network1, network2) = test_helper::build_network_pair().await.unwrap();
145145
// transaction
146146
let msg_send = PeerMessage::new_transactions(
@@ -149,7 +149,10 @@ async fn test_event_notify_receive() {
149149
);
150150
let mut receiver = network2.message_handler.channel();
151151
network1.service_ref.send_peer_message(msg_send.clone());
152-
let msg_receive = receiver.next().await.unwrap();
152+
let msg_receive = receiver
153+
.next()
154+
.await
155+
.ok_or_else(|| format_err!("in network1, receive message timeout or return none"))?;
153156
assert_eq!(msg_send.notification, msg_receive.notification);
154157

155158
//block
@@ -162,12 +165,17 @@ async fn test_event_notify_receive() {
162165
);
163166
let mut receiver = network2.message_handler.channel();
164167
network1.service_ref.send_peer_message(msg_send.clone());
165-
let msg_receive = receiver.next().await.unwrap();
168+
let msg_receive = receiver
169+
.next()
170+
.await
171+
.ok_or_else(|| format_err!("in network2, receive message timeout or return none"))?;
166172
assert_eq!(msg_send.notification, msg_receive.notification);
173+
174+
Ok(())
167175
}
168176

169177
#[stest::test]
170-
async fn test_event_notify_receive_repeat_block() {
178+
async fn test_event_notify_receive_repeat_block() -> anyhow::Result<()> {
171179
let (network1, network2) = test_helper::build_network_pair().await.unwrap();
172180

173181
let block = Block::new(BlockHeader::random(), BlockBody::new_empty());
@@ -189,12 +197,16 @@ async fn test_event_notify_receive_repeat_block() {
189197
assert_eq!(msg_send1.notification, msg_receive1.notification);
190198

191199
//repeat message is filter, so expect timeout error.
192-
let msg_receive2 = async_std::future::timeout(Duration::from_secs(2), receiver.next()).await;
193-
assert!(msg_receive2.is_err());
200+
let result = async_std::future::timeout(Duration::from_secs(1), receiver.next()).await;
201+
if result.is_err() {
202+
Ok(())
203+
} else {
204+
bail!("expect timeout error, but receive message.")
205+
}
194206
}
195207

196208
#[stest::test]
197-
async fn test_event_notify_receive_repeat_transaction() {
209+
async fn test_event_notify_receive_repeat_transaction() -> anyhow::Result<()> {
198210
let (network1, network2) = test_helper::build_network_pair().await.unwrap();
199211

200212
let txn1 = SignedUserTransaction::mock();
@@ -236,8 +248,12 @@ async fn test_event_notify_receive_repeat_transaction() {
236248
);
237249

238250
//msg3 is empty after filter, so expect timeout error.
239-
let msg_receive3 = async_std::future::timeout(Duration::from_secs(1), receiver.next()).await;
240-
assert!(msg_receive3.is_err());
251+
let result = async_std::future::timeout(Duration::from_secs(1), receiver.next()).await;
252+
if result.is_err() {
253+
Ok(())
254+
} else {
255+
bail!("expect timeout error, but receive message.")
256+
}
241257
}
242258

243259
fn mock_block_info(total_difficulty: U256) -> BlockInfo {
@@ -250,7 +266,7 @@ fn mock_block_info(total_difficulty: U256) -> BlockInfo {
250266
}
251267

252268
#[stest::test]
253-
async fn test_event_broadcast() {
269+
async fn test_event_broadcast() -> anyhow::Result<()> {
254270
let mut nodes = test_helper::build_network_cluster(3).await.unwrap();
255271
let node3 = nodes.pop().unwrap();
256272
let node2 = nodes.pop().unwrap();
@@ -268,22 +284,34 @@ async fn test_event_broadcast() {
268284
)));
269285
node1.service_ref.broadcast(notification.clone());
270286

271-
let msg_receive2 = receiver2.next().await.unwrap();
287+
let msg_receive2 = receiver2
288+
.next()
289+
.await
290+
.ok_or_else(|| format_err!("in receive2, receive message timeout or return none"))?;
272291
assert_eq!(notification, msg_receive2.notification);
273292

274-
let msg_receive3 = receiver3.next().await.unwrap();
293+
let msg_receive3 = receiver3
294+
.next()
295+
.await
296+
.ok_or_else(|| format_err!("in receive3, receive message timeout or return none"))?;
275297
assert_eq!(notification, msg_receive3.notification);
276298

277299
//repeat broadcast
278300
node2.service_ref.broadcast(notification.clone());
279301

280302
let msg_receive1 = async_std::future::timeout(Duration::from_secs(1), receiver1.next()).await;
281-
assert!(msg_receive1.is_err());
303+
if msg_receive1.is_ok() {
304+
bail!("expect timeout error, but receive message.")
305+
}
282306

283307
let msg_receive3 = async_std::future::timeout(Duration::from_secs(1), receiver3.next()).await;
284-
assert!(msg_receive3.is_err());
308+
if msg_receive3.is_ok() {
309+
bail!("expect timeout error, but receive message.")
310+
}
285311

286312
print!("{:?}", node1.config.metrics.registry().unwrap().gather());
313+
314+
Ok(())
287315
}
288316

289317
#[stest::test]

storage/src/block/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,10 @@ impl BlockStorage {
408408
self.dag_sync_block_storage.remove(block_id)
409409
}
410410

411+
pub fn delete_all_dag_sync_blocks(&self) -> Result<()> {
412+
self.dag_sync_block_storage.remove_all()
413+
}
414+
411415
pub fn get_dag_sync_block(&self, block_id: HashValue) -> Result<Option<DagSyncBlock>> {
412416
self.dag_sync_block_storage.get(block_id)
413417
}

storage/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ pub trait BlockStore {
292292

293293
fn save_dag_sync_block(&self, block: DagSyncBlock) -> Result<()>;
294294
fn delete_dag_sync_block(&self, block_id: HashValue) -> Result<()>;
295+
fn delete_all_dag_sync_blocks(&self) -> Result<()>;
295296
fn get_dag_sync_block(&self, block_id: HashValue) -> Result<Option<DagSyncBlock>>;
296297
}
297298

@@ -548,6 +549,10 @@ impl BlockStore for Storage {
548549
self.block_storage.delete_dag_sync_block(block_id)
549550
}
550551

552+
fn delete_all_dag_sync_blocks(&self) -> Result<()> {
553+
self.block_storage.delete_all_dag_sync_blocks()
554+
}
555+
551556
fn get_dag_sync_block(&self, block_id: HashValue) -> Result<Option<DagSyncBlock>> {
552557
self.block_storage.get_dag_sync_block(block_id)
553558
}

storage/src/storage.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use byteorder::{BigEndian, ReadBytesExt};
1212
use rocksdb::{DBPinnableSlice, WriteBatch as DBWriteBatch};
1313
use starcoin_config::NodeConfig;
1414
use starcoin_crypto::HashValue;
15-
use starcoin_logger::prelude::info;
15+
use starcoin_logger::prelude::{debug, info};
1616
use starcoin_vm_types::state_store::table::TableHandle;
1717
use std::{convert::TryInto, fmt::Debug, marker::PhantomData, sync::Arc};
1818

@@ -517,6 +517,8 @@ where
517517
fn get_raw(&self, key: K) -> Result<Option<Vec<u8>>>;
518518

519519
fn iter(&self) -> Result<SchemaIterator<K, V>>;
520+
521+
fn remove_all(&self) -> Result<()>;
520522
}
521523

522524
impl KeyCodec for u64 {
@@ -660,4 +662,24 @@ where
660662
.ok_or_else(|| format_err!("Only support scan on db storage instance"))?;
661663
db.iter::<K, V>(self.get_store().prefix_name)
662664
}
665+
666+
fn remove_all(&self) -> Result<()> {
667+
if let Some(db) = self.get_store().storage().db() {
668+
let mut iter = db.iter::<K, V>(self.get_store().prefix_name)?;
669+
iter.seek_to_first();
670+
for result_item in iter {
671+
match result_item {
672+
Ok(item) => {
673+
let (key, _) = item;
674+
self.remove(key)?;
675+
}
676+
Err(e) => {
677+
debug!("finish to remove all keys in db with an error: {:?}", e);
678+
}
679+
}
680+
}
681+
}
682+
683+
Ok(())
684+
}
663685
}

0 commit comments

Comments
 (0)