Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions crates/ethcore/src/engines/hbbft/contracts/keygen_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,25 @@ pub fn acks_of_address(
if serialized_ack.is_empty() {
return Err(CallError::ReturnValueInvalid);
}
let deserialized_ack: Ack = bincode::deserialize(&serialized_ack).unwrap();
let outcome = skg
.handle_ack(vmap.get(&address).unwrap(), deserialized_ack)
.unwrap();
let deserialized_ack: Ack = match bincode::deserialize(&serialized_ack) {
Ok(ack) => ack,
Err(e) => {
error!(target: "engine", "Failed to deserialize Ack #{} for address {}: {:?}", n, address, e);
return Err(CallError::ReturnValueInvalid);
}
};

let outcome = match skg.handle_ack(vmap.get(&address).unwrap(), deserialized_ack) {
Ok(s) => s,
Err(e) => {
error!(target: "engine", "Failed to handle Ack #{} for address {}: {:?}", n, address, e);
return Err(CallError::ReturnValueInvalid);
}
};

if let AckOutcome::Invalid(fault) = outcome {
panic!("Expected Ack Outcome to be valid. {:?}", fault);
error!(target: "engine", "Invalid Ack Outcome for #{} for address {}: {:?}", n, address, fault);
return Err(CallError::ReturnValueInvalid);
}
}

Expand Down
9 changes: 5 additions & 4 deletions crates/ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,10 +884,11 @@ impl SyncHandler {
return Ok(());
}

if io
.chain()
.transaction_if_readable(&hash, &deadline.time_left())
.is_none()
if !sync.lately_received_transactions.contains(&hash)
&& io
.chain()
.transaction_if_readable(&hash, &deadline.time_left())
.is_none()
{
sync.peers
.get_mut(&peer_id)
Expand Down
11 changes: 11 additions & 0 deletions crates/ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,8 @@ pub struct ChainSync {
statistics: SyncPropagatorStatistics,
/// memorizing currently pooled transaction to reduce the number of pooled transaction requests.
asking_pooled_transaction_overview: PooledTransactionOverview,
/// memorized lately received transactions to avoid requesting them again.
lately_received_transactions: H256FastSet,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -849,6 +851,7 @@ impl ChainSync {
new_transactions_stats_period: config.new_transactions_stats_period,
statistics: SyncPropagatorStatistics::new(),
asking_pooled_transaction_overview: PooledTransactionOverview::new(),
lately_received_transactions: H256FastSet::default(),
};
sync.update_targets(chain);
sync
Expand Down Expand Up @@ -952,6 +955,9 @@ impl ChainSync {
trace!(target: "sync", "Received {:?}", txs.iter().map(|t| t.hash).map(|t| t.0).collect::<Vec<_>>());
}

self.lately_received_transactions
.extend(txs.iter().map(|tx| tx.hash()));

// Remove imported txs from all request queues
let imported = txs.iter().map(|tx| tx.hash()).collect::<H256FastSet>();
for (pid, peer_info) in &mut self.peers {
Expand Down Expand Up @@ -1024,6 +1030,7 @@ impl ChainSync {
// Reactivate peers only if some progress has been made
// since the last sync round of if starting fresh.
self.active_peers = self.peers.keys().cloned().collect();
self.lately_received_transactions.clear();
debug!(target: "sync", "resetting sync state to {:?}", self.state);
}

Expand Down Expand Up @@ -1267,6 +1274,7 @@ impl ChainSync {
debug!(target: "sync", "sync_peer: {} force {} state: {:?}",
peer_id, force, self.state
);

if !self.active_peers.contains(&peer_id) {
trace!(target: "sync", "Skipping deactivated peer {}", peer_id);
return;
Expand Down Expand Up @@ -1441,6 +1449,9 @@ impl ChainSync {
if let Some(peer) = self.peers.get_mut(&peer_id) {
// info: this check should do nothing, if everything is tracked correctly,

peer.unfetched_pooled_transactions
.retain(|h| !self.lately_received_transactions.contains(h));

if peer.asking_pooled_transactions.is_empty() {
// todo: we might just request the same transactions from multiple peers here, at the same time.
// we should keep track of how many replicas of a transaction we had requested.
Expand Down
Loading