Skip to content

Commit

Permalink
fix(consensus): deferred execution fixes (#1039)
Browse files Browse the repository at this point in the history
Description
---
- fix incorrect transaction fee after execution
- fix incorrect resolved substates for rejected transations
- calculate merkle root for block after all executions for a block are
complete
- commit JMT diff as part of block changeset
- fix deferred transaction finalization 
- fix mempool marks transaction as deferred and sends to consensus
instead of ABORTing deferred transactions immediately

Motivation and Context
---
Number of omissions and bugs in deferred transaction execution are fixed
in the PR.

How Has This Been Tested?
---
Manually, commenting out code in the wallet daemon and mempool that
fills or resolves inputs for a transaction and use unversioned inputs.
The results in deferred execution which succeeds after this PR. Tested
creating an account and transferring. Also tested concurrent local-only
conflicting transactions, the second transaction was aborted due to a
lock conflict, this still needs to be investigated.

What process can a PR reviewer use to test or verify this change?
---
It's currently not possible to directly test this on a single-shard
network without forcing transactions to be deferred in code. Mutlishard
deferred is not currently supported because we cannot resolve foreign
inputs.

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored May 22, 2024
1 parent 4b8382f commit 19eecb3
Show file tree
Hide file tree
Showing 17 changed files with 214 additions and 99 deletions.
13 changes: 9 additions & 4 deletions applications/tari_dan_app_utilities/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ impl ExecutionOutput {
&self,
inputs: IndexMap<VersionedSubstateId, Substate>,
) -> IndexSet<VersionedSubstateIdLockIntent> {
let mut resolved_inputs = IndexSet::new();
if let Some(diff) = self.result.finalize.accept() {
resolved_inputs = inputs
inputs
.into_iter()
.map(|(versioned_id, _)| {
let lock_flag = if diff.down_iter().any(|(id, _)| *id == versioned_id.substate_id) {
Expand All @@ -68,9 +67,15 @@ impl ExecutionOutput {
};
VersionedSubstateIdLockIntent::new(versioned_id, lock_flag)
})
.collect::<IndexSet<_>>();
.collect()
} else {
// TODO: we might want to have a SubstateLockFlag::None for rejected transactions so that we still know the
// shards involved but do not lock them. We dont actually lock anything for rejected transactions anyway.
inputs
.into_iter()
.map(|(versioned_id, _)| VersionedSubstateIdLockIntent::new(versioned_id, SubstateLockFlag::Read))
.collect()
}
resolved_inputs
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
exec_output.outputs,
exec_output.execution_time,
);
info!(target: LOG_TARGET, "Transaction {} executed. {:?}", id,executed);
info!(target: LOG_TARGET, "Transaction {} executed. {}", id,executed.result().finalize.result);
Ok(executed)
}
}
Expand All @@ -96,17 +96,13 @@ impl<TEpochManager, TExecutor> TariDanBlockTransactionExecutor<TEpochManager, TE
Some(version) => {
let id = VersionedSubstateId::new(input.substate_id, version);
let substate = store.get(&id.to_substate_address())?;
info!(target: LOG_TARGET, "Resolved substate: {id}");
resolved_substates.insert(id, substate);
},
None => {
let (id, substate) = self.resolve_local_substate::<TStateStore>(input.substate_id, store)?;
info!(target: LOG_TARGET, "Resolved unversioned substate: {id}");
resolved_substates.insert(id, substate);
// We try to fetch each input from the block "cache", and only hit the DB if the input has not been
// used in the block before
// match self.output_versions.get(input.substate_id()) {
// Some(version) => VersionedSubstateId::new(input.substate_id, *version),
// None =>
// }
},
}
}
Expand Down Expand Up @@ -141,7 +137,7 @@ impl<TEpochManager, TExecutor> TariDanBlockTransactionExecutor<TEpochManager, TE
.map_err(|e| BlockTransactionExecutorError::StateStoreError(e.to_string()))?;
for (id, substate) in inputs {
access
.set_state(&id, substate)
.set_state(id.substate_id(), substate)
.map_err(|e| BlockTransactionExecutorError::StateStoreError(e.to_string()))?;
}
access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ where

if !foreign.is_empty() {
info!(target: LOG_TARGET, "Unable to execute transaction {} in the mempool because it has foreign inputs: {:?}", transaction.id(), foreign);
return Ok(Err(MempoolError::MustDeferExecution {
return Err(MempoolError::MustDeferExecution {
local_substates,
foreign_substates: foreign,
}));
});
}

info!(target: LOG_TARGET, "🎱 Transaction {} resolved local inputs = [{}]", transaction.id(), local_substates.keys().map(|addr| addr.to_string()).collect::<Vec<_>>().join(", "));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{collections::HashSet, fmt::Display, iter};
use std::{collections::HashSet, fmt, fmt::Display, iter};

use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use log::*;
Expand Down Expand Up @@ -78,6 +78,21 @@ pub enum TransactionExecution {
Deferred { transaction: Transaction },
}

impl Display for TransactionExecution {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TransactionExecution::Executed { result } => match result {
Ok(executed) => write!(f, "Executed {}: {}", executed.id(), executed.result().finalize.result),
Err(e) => write!(f, "Execution failed: {}", e),
},
TransactionExecution::ExecutionFailure { error, .. } => {
write!(f, "Unexpected Execution failure: {}", error)
},
TransactionExecution::Deferred { transaction } => write!(f, "Deferred: {}", transaction.id()),
}
}
}

#[derive(Debug)]
pub struct MempoolService<TValidator, TExecutedValidator, TExecutor, TSubstateResolver> {
transactions: HashSet<TransactionId>,
Expand Down Expand Up @@ -501,6 +516,7 @@ where
sender_shard,
} = result;

info!(target: LOG_TARGET, "🎱 Transaction {transaction_id} execution: {execution}");
match execution {
TransactionExecution::Executed { result } => {
self.handle_execution_complete(transaction_id, result, should_propagate, sender_shard)
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ impl CommitteeInfo {
}

/// Calculates the number of distinct shards for a given shard set
pub fn count_distinct_shards<'a, I: IntoIterator<Item = &'a SubstateAddress>>(&self, shards: I) -> usize {
pub fn count_distinct_shards<B: Borrow<SubstateAddress>, I: IntoIterator<Item = B>>(&self, shards: I) -> usize {
shards
.into_iter()
.map(|shard| shard.to_shard(self.num_committees))
.map(|shard| shard.borrow().to_shard(self.num_committees))
.collect::<std::collections::HashSet<_>>()
.len()
}
Expand Down
17 changes: 15 additions & 2 deletions dan_layer/consensus/src/hotstuff/block_change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tari_dan_storage::{
BlockDiff,
LeafBlock,
LockedSubstate,
PendingStateTreeDiff,
QuorumDecision,
SubstateChange,
SubstateRecord,
Expand All @@ -24,6 +25,7 @@ use tari_dan_storage::{
StorageError,
};
use tari_engine_types::substate::SubstateId;
use tari_state_tree::StateHashTreeDiff;
use tari_transaction::TransactionId;

#[derive(Debug, Clone)]
Expand All @@ -38,6 +40,7 @@ pub struct ProposedBlockChangeSet {
block: LeafBlock,
quorum_decision: Option<QuorumDecision>,
block_diff: Vec<SubstateChange>,
state_tree_diff: StateHashTreeDiff,
substate_locks: IndexMap<SubstateId, Vec<LockedSubstate>>,
transaction_changes: IndexMap<TransactionId, TransactionChangeSet>,
}
Expand All @@ -50,6 +53,7 @@ impl ProposedBlockChangeSet {
block_diff: Vec::new(),
substate_locks: IndexMap::new(),
transaction_changes: IndexMap::new(),
state_tree_diff: StateHashTreeDiff::default(),
}
}

Expand All @@ -60,6 +64,11 @@ impl ProposedBlockChangeSet {
self
}

pub fn set_state_tree_diff(&mut self, diff: StateHashTreeDiff) -> &mut Self {
self.state_tree_diff = diff;
self
}

pub fn set_quorum_decision(&mut self, decision: QuorumDecision) -> &mut Self {
self.quorum_decision = Some(decision);
self
Expand Down Expand Up @@ -124,8 +133,12 @@ impl ProposedBlockChangeSet {
TTx: StateStoreWriteTransaction + Deref,
TTx::Target: StateStoreReadTransaction,
{
// Save the diff
BlockDiff::new(self.block.block_id, self.block_diff).insert(tx)?;
let block_diff = BlockDiff::new(self.block.block_id, self.block_diff);
// Store the block diff
block_diff.insert(tx)?;

// Store the tree diff
PendingStateTreeDiff::new(*self.block.block_id(), self.block.height(), self.state_tree_diff).save(tx)?;

// Save locks
SubstateRecord::insert_all_locks(tx, self.block.block_id, self.substate_locks)?;
Expand Down
46 changes: 34 additions & 12 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ where TConsensusSpec: ConsensusSpec
}

let validator = self.epoch_manager.get_our_validator_node(epoch).await?;
let local_committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?;
let local_committee_info = self.epoch_manager.get_local_committee_info(epoch).await?;
let (current_base_layer_block_height, current_base_layer_block_hash) =
self.epoch_manager.current_base_layer_block_info().await?;
let (high_qc, qc_block, locked_block) = self.store.with_read_tx(|tx| {
Expand Down Expand Up @@ -191,7 +191,7 @@ where TConsensusSpec: ConsensusSpec
&leaf_block,
high_qc,
validator.public_key,
&local_committee_shard,
&local_committee_info,
// TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this
// is a good idea.
is_newview_propose,
Expand All @@ -208,10 +208,20 @@ where TConsensusSpec: ConsensusSpec
executed_transactions.len(),
next_block.id()
);
for executed in executed_transactions.into_values() {
executed
.into_execution_for_block(*next_block.id())
.insert_if_required(tx)?;
for mut executed in executed_transactions.into_values() {
// TODO: This is a hacky workaround, if the executed transaction has no shards after execution, we
// remove it from the pool so that it does not get proposed again. Ideally we should be
// able to catch this in transaction validation.
if local_committee_info.count_distinct_shards(executed.involved_addresses_iter()) == 0 {
self.transaction_pool.remove(tx, *executed.id())?;
executed
.set_abort("Transaction has no involved shards after execution")
.update(tx)?;
} else {
executed
.into_execution_for_block(*next_block.id())
.insert_if_required(tx)?;
}
}

next_block.as_last_proposed().set(tx)?;
Expand Down Expand Up @@ -286,7 +296,7 @@ where TConsensusSpec: ConsensusSpec
executed_transactions: &mut HashMap<TransactionId, ExecutedTransaction>,
) -> Result<Option<Command>, HotStuffError> {
// Execute deferred transaction
let num_involved_shards = if tx_rec.is_deferred() {
if tx_rec.is_deferred() {
info!(
target: LOG_TARGET,
"👨‍🔧 PROPOSE: Executing deferred transaction {}",
Expand All @@ -297,20 +307,32 @@ where TConsensusSpec: ConsensusSpec
// Update the decision so that we can propose it
tx_rec.set_local_decision(executed.decision());
tx_rec.set_initial_evidence(executed.to_initial_evidence());
tx_rec.set_transaction_fee(executed.transaction_fee());
executed_transactions.insert(*executed.id(), executed);
// update involved shards since that may have changed after execution
local_committee_info.count_distinct_shards(tx_rec.atom().evidence.substate_addresses_iter())
} else if tx_rec.current_decision().is_commit() && tx_rec.current_stage().is_new() {
// Executed in mempool add to this blocks executed transactions
// Executed in mempool. Add to this block's executed transactions
let executed = ExecutedTransaction::get(tx, tx_rec.transaction_id())?;
tx_rec.set_local_decision(executed.decision());
tx_rec.set_initial_evidence(executed.to_initial_evidence());
tx_rec.set_transaction_fee(executed.transaction_fee());
executed_transactions.insert(*executed.id(), executed);
local_committee_info.count_distinct_shards(tx_rec.atom().evidence.substate_addresses_iter())
} else {
local_committee_info.count_distinct_shards(tx_rec.atom().evidence.substate_addresses_iter())
// Continue...
};

let num_involved_shards =
local_committee_info.count_distinct_shards(tx_rec.atom().evidence.substate_addresses_iter());

if num_involved_shards == 0 {
warn!(
target: LOG_TARGET,
"Transaction {} has no involved shards, skipping...",
tx_rec.transaction_id(),
);

return Ok(None);
}

// If the transaction is local only, propose LocalOnly. If the transaction is not new, it must have been
// previously prepared in a multi-shard command (TBD if that a valid thing to do).
if num_involved_shards == 1 && !tx_rec.current_stage().is_new() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ where TConsensusSpec: ConsensusSpec
let executed = self.execute_transaction_if_required(&substate_store, &atom.id, block.id())?;
tx_rec.set_local_decision(executed.decision());
tx_rec.set_initial_evidence(executed.to_initial_evidence());
tx_rec.set_transaction_fee(executed.transaction_fee());
proposed_block_change_set.add_transaction_execution(executed);
} else if tx_rec.current_decision().is_commit() &&
matches!(
Expand All @@ -417,6 +418,7 @@ where TConsensusSpec: ConsensusSpec
// Align the TransactionPoolRecord with the relevant execution
tx_rec.set_local_decision(execution.decision());
tx_rec.set_initial_evidence(execution.to_initial_evidence());
tx_rec.set_transaction_fee(execution.transaction_fee());
proposed_block_change_set.add_transaction_execution(execution);
} else {
// continue
Expand Down Expand Up @@ -492,6 +494,8 @@ where TConsensusSpec: ConsensusSpec
block,
tx_rec.transaction_id(),
);
// TODO: Add a reason for the ABORT. Perhaps a reason enum
// Decision::Abort(AbortReason::LockConflict)
tx_rec.set_local_decision(Decision::Abort);
proposed_block_change_set.set_next_transaction_update(
&tx_rec,
Expand Down Expand Up @@ -580,9 +584,14 @@ where TConsensusSpec: ConsensusSpec

let executed = self.execute_transaction_if_required(&substate_store, &atom.id, block.id())?;
tx_rec.set_local_decision(executed.decision());
tx_rec.set_initial_evidence(executed.to_initial_evidence());
tx_rec.set_transaction_fee(executed.transaction_fee());
proposed_block_change_set.add_transaction_execution(executed);
} else {
let executed = ExecutedTransaction::get_pending_execution_for_block(tx, block.id(), &t.id)?;
tx_rec.set_local_decision(executed.decision());
tx_rec.set_initial_evidence(executed.to_initial_evidence());
tx_rec.set_transaction_fee(executed.transaction_fee());
proposed_block_change_set.add_transaction_execution(executed);
}

Expand Down Expand Up @@ -865,9 +874,22 @@ where TConsensusSpec: ConsensusSpec
// return Ok(proposed_block_change_set.no_vote());
}

let (diff, locks) = substate_store.into_diff_and_locks();
let (expected_merkle_root, tree_diff) = substate_store.calculate_jmt_diff_for_block(block)?;
if expected_merkle_root != *block.merkle_root() {
warn!(
target: LOG_TARGET,
"❌ Merkle root disagreement for block {}. Leader proposed {}, we calculated {}",
block,
block.merkle_root(),
expected_merkle_root
);
return Ok(proposed_block_change_set.no_vote());
}

let (diff, locks) = substate_store.into_parts();
proposed_block_change_set
.set_block_diff(diff)
.set_state_tree_diff(tree_diff)
.set_substate_locks(locks)
.set_quorum_decision(QuorumDecision::Accept);

Expand Down Expand Up @@ -1078,7 +1100,7 @@ where TConsensusSpec: ConsensusSpec
let finalized_transactions = self
.transaction_pool
.remove_all(tx, block.all_accepted_transactions_ids())?;
TransactionRecord::finalize_all(tx, &finalized_transactions)?;
TransactionRecord::finalize_all(tx, *block.id(), &finalized_transactions)?;

if !finalized_transactions.is_empty() {
debug!(
Expand Down
Loading

0 comments on commit 19eecb3

Please sign in to comment.