Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple nonces #466

Merged
merged 6 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 8 additions & 27 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ use ::tributary::{
};

mod tributary;
use crate::tributary::{
TributarySpec, SignData, Transaction, NonceDecider, scanner::RecognizedIdType, PlanIds,
};
use crate::tributary::{TributarySpec, SignData, Transaction, scanner::RecognizedIdType, PlanIds};

mod db;
use db::MainDb;
Expand Down Expand Up @@ -135,14 +133,14 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
) {
log::debug!("publishing transaction {}", hex::encode(tx.hash()));

let signer = if let TransactionKind::Signed(signed) = tx.kind() {
let (order, signer) = if let TransactionKind::Signed(order, signed) = tx.kind() {
let signer = signed.signer;

// Safe as we should deterministically create transactions, meaning if this is already on-disk,
// it's what we're saving now
MainDb::<D>::save_signed_transaction(txn, signed.nonce, tx);

signer
(order, signer)
} else {
panic!("non-signed transaction passed to publish_signed_transaction");
};
Expand All @@ -152,7 +150,7 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
while let Some(tx) = MainDb::<D>::take_signed_transaction(
txn,
tributary
.next_nonce(signer)
.next_nonce(&signer, &order)
.await
.expect("we don't have a nonce, meaning we aren't a participant on this tributary"),
) {
Expand Down Expand Up @@ -697,25 +695,8 @@ async fn handle_processor_message<D: Db, P: P2p>(
Err(e) => panic!("created an invalid unsigned transaction: {e:?}"),
}
}
TransactionKind::Signed(_) => {
log::trace!("getting next nonce for Tributary TX in response to processor message");

let nonce = loop {
let Some(nonce) =
NonceDecider::nonce(&txn, genesis, &tx).expect("signed TX didn't have nonce")
else {
// This can be None if the following events occur, in order:
// 1) We scanned the relevant transaction(s) in a Tributary block
// 2) The processor was sent a message and responded
// 3) The Tributary TXN has yet to be committed
log::warn!("nonce has yet to be saved for processor-instigated transaction");
sleep(Duration::from_millis(100)).await;
continue;
};
break nonce;
};
tx.sign(&mut OsRng, genesis, key, nonce);

TransactionKind::Signed(_, _) => {
tx.sign(&mut OsRng, genesis, key);
publish_signed_transaction(&mut txn, tributary, tx).await;
}
}
Expand Down Expand Up @@ -1066,7 +1047,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
}
});

move |set: ValidatorSet, genesis, id_type, id: Vec<u8>, nonce| {
move |set: ValidatorSet, genesis, id_type, id: Vec<u8>| {
log::debug!("recognized ID {:?} {}", id_type, hex::encode(&id));
let mut raw_db = raw_db.clone();
let key = key.clone();
Expand Down Expand Up @@ -1104,7 +1085,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
}),
};

tx.sign(&mut OsRng, genesis, &key, nonce);
tx.sign(&mut OsRng, genesis, &key);

let mut first = true;
loop {
Expand Down
16 changes: 8 additions & 8 deletions coordinator/src/tests/tributary/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn dkg_test() {

let mut tx =
Transaction::DkgCommitments(attempt, vec![commitments], Transaction::empty_signed());
tx.sign(&mut OsRng, spec.genesis(), key, 0);
tx.sign(&mut OsRng, spec.genesis(), key);
txs.push(tx);
}

Expand Down Expand Up @@ -88,7 +88,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
key,
|_, _, _, _, _| async {
|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called in new_processors")
},
&processors,
Expand All @@ -114,7 +114,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
&keys[0],
|_, _, _, _, _| async {
|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after Commitments")
},
&processors,
Expand Down Expand Up @@ -177,7 +177,7 @@ async fn dkg_test() {
confirmation_nonces: crate::tributary::dkg_confirmation_nonces(key, &spec, 0),
signed: Transaction::empty_signed(),
};
tx.sign(&mut OsRng, spec.genesis(), key, 1);
tx.sign(&mut OsRng, spec.genesis(), key);
txs.push(tx);
}

Expand All @@ -193,7 +193,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
&keys[0],
|_, _, _, _, _| async {
|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after some shares")
},
&processors,
Expand Down Expand Up @@ -241,7 +241,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
&keys[0],
|_, _, _, _, _| async { panic!("provided TX caused recognized_id to be called after shares") },
|_, _, _, _| async { panic!("provided TX caused recognized_id to be called after shares") },
&processors,
|_, _| async { panic!("test tried to publish a new Serai TX") },
&spec,
Expand Down Expand Up @@ -296,7 +296,7 @@ async fn dkg_test() {
txn.commit();

let mut tx = Transaction::DkgConfirmed(attempt, share, Transaction::empty_signed());
tx.sign(&mut OsRng, spec.genesis(), key, 2);
tx.sign(&mut OsRng, spec.genesis(), key);
txs.push(tx);
}
let block_before_tx = tributaries[0].1.tip().await;
Expand All @@ -311,7 +311,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
&keys[0],
|_, _, _, _, _| async {
|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after DKG confirmation")
},
&processors,
Expand Down
53 changes: 39 additions & 14 deletions coordinator/src/tests/tributary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng};
use scale::{Encode, Decode};
use processor_messages::coordinator::SubstrateSignableId;

use tributary::{ReadWrite, tests::random_signed};
use tributary::{ReadWrite, tests::random_signed_with_nonce};

use crate::tributary::{SignData, Transaction};

Expand Down Expand Up @@ -34,6 +34,7 @@ fn random_vec<R: RngCore>(rng: &mut R, limit: usize) -> Vec<u8> {
fn random_sign_data<R: RngCore, Id: Clone + PartialEq + Eq + Debug + Encode + Decode>(
rng: &mut R,
plan: Id,
nonce: u32,
) -> SignData<Id> {
SignData {
plan,
Expand All @@ -47,7 +48,7 @@ fn random_sign_data<R: RngCore, Id: Clone + PartialEq + Eq + Debug + Encode + De
res
},

signed: random_signed(&mut OsRng),
signed: random_signed_with_nonce(&mut OsRng, nonce),
}
}

Expand Down Expand Up @@ -83,18 +84,40 @@ fn tx_size_limit() {

#[test]
fn serialize_sign_data() {
fn test_read_write<Id: Clone + PartialEq + Eq + Debug + Encode + Decode>(value: SignData<Id>) {
let mut buf = vec![];
value.write(&mut buf).unwrap();
assert_eq!(value, SignData::read(&mut buf.as_slice(), value.signed.nonce).unwrap())
}

let mut plan = [0; 3];
OsRng.fill_bytes(&mut plan);
test_read_write(random_sign_data::<_, _>(&mut OsRng, plan));
test_read_write(random_sign_data::<_, _>(
&mut OsRng,
plan,
u32::try_from(OsRng.next_u64() >> 32).unwrap(),
));
let mut plan = [0; 5];
OsRng.fill_bytes(&mut plan);
test_read_write(random_sign_data::<_, _>(&mut OsRng, plan));
test_read_write(random_sign_data::<_, _>(
&mut OsRng,
plan,
u32::try_from(OsRng.next_u64() >> 32).unwrap(),
));
let mut plan = [0; 8];
OsRng.fill_bytes(&mut plan);
test_read_write(random_sign_data::<_, _>(&mut OsRng, plan));
test_read_write(random_sign_data::<_, _>(
&mut OsRng,
plan,
u32::try_from(OsRng.next_u64() >> 32).unwrap(),
));
let mut plan = [0; 24];
OsRng.fill_bytes(&mut plan);
test_read_write(random_sign_data::<_, _>(&mut OsRng, plan));
test_read_write(random_sign_data::<_, _>(
&mut OsRng,
plan,
u32::try_from(OsRng.next_u64() >> 32).unwrap(),
));
}

#[test]
Expand All @@ -114,7 +137,7 @@ fn serialize_transaction() {
test_read_write(Transaction::DkgCommitments(
random_u32(&mut OsRng),
commitments,
random_signed(&mut OsRng),
random_signed_with_nonce(&mut OsRng, 0),
));
}

Expand Down Expand Up @@ -145,7 +168,7 @@ fn serialize_transaction() {
OsRng.fill_bytes(&mut nonces);
nonces
},
signed: random_signed(&mut OsRng),
signed: random_signed_with_nonce(&mut OsRng, 1),
});
}

Expand All @@ -165,7 +188,7 @@ fn serialize_transaction() {
} else {
Some(random_vec(&mut OsRng, 500)).filter(|blame| !blame.is_empty())
},
signed: random_signed(&mut OsRng),
signed: random_signed_with_nonce(&mut OsRng, 2),
});
}

Expand All @@ -176,7 +199,7 @@ fn serialize_transaction() {
OsRng.fill_bytes(&mut share);
share
},
random_signed(&mut OsRng),
random_signed_with_nonce(&mut OsRng, 2),
));

{
Expand All @@ -200,6 +223,7 @@ fn serialize_transaction() {
test_read_write(Transaction::SubstratePreprocess(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(plan),
0,
)));
}
{
Expand All @@ -208,18 +232,19 @@ fn serialize_transaction() {
test_read_write(Transaction::SubstrateShare(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(plan),
1,
)));
}

{
let mut plan = [0; 32];
OsRng.fill_bytes(&mut plan);
test_read_write(Transaction::SignPreprocess(random_sign_data(&mut OsRng, plan)));
test_read_write(Transaction::SignPreprocess(random_sign_data(&mut OsRng, plan, 0)));
}
{
let mut plan = [0; 32];
OsRng.fill_bytes(&mut plan);
test_read_write(Transaction::SignShare(random_sign_data(&mut OsRng, plan)));
test_read_write(Transaction::SignShare(random_sign_data(&mut OsRng, plan, 1)));
}

{
Expand All @@ -230,8 +255,8 @@ fn serialize_transaction() {
test_read_write(Transaction::SignCompleted {
plan,
tx_hash,
first_signer: random_signed(&mut OsRng).signer,
signature: random_signed(&mut OsRng).signature,
first_signer: random_signed_with_nonce(&mut OsRng, 2).signer,
signature: random_signed_with_nonce(&mut OsRng, 2).signature,
});
}
}
2 changes: 1 addition & 1 deletion coordinator/src/tests/tributary/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn tx_test() {
let block_before_tx = tributaries[sender].1.tip().await;
let mut tx =
Transaction::DkgCommitments(attempt, vec![commitments.clone()], Transaction::empty_signed());
tx.sign(&mut OsRng, spec.genesis(), &key, 0);
tx.sign(&mut OsRng, spec.genesis(), &key);

assert_eq!(tributaries[sender].1.add_transaction(tx.clone()).await, Ok(true));
let included_in = wait_for_tx_inclusion(&tributaries[sender].1, block_before_tx, tx.hash()).await;
Expand Down
22 changes: 6 additions & 16 deletions coordinator/src/tributary/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::{
processors::Processors,
tributary::{
Transaction, TributarySpec, SeraiBlockNumber, Topic, DataSpecification, DataSet, Accumulation,
nonce_decider::NonceDecider,
dkg_confirmer::DkgConfirmer,
scanner::{RecognizedIdType, RIDTrait},
FatallySlashed, DkgShare, PlanIds, ConfirmationNonces, AttemptDb, DataDb,
Expand Down Expand Up @@ -154,7 +153,7 @@ pub(crate) async fn handle_application_tx<
// Don't handle transactions from fatally slashed participants
// TODO: Because fatally slashed participants can still publish onto the blockchain, they have
// a notable DoS ability
if let TransactionKind::Signed(signed) = tx.kind() {
if let TransactionKind::Signed(_, signed) = tx.kind() {
if FatallySlashed::get(txn, genesis, signed.signer.to_bytes()).is_some() {
return;
}
Expand All @@ -176,6 +175,8 @@ pub(crate) async fn handle_application_tx<
};

// If they've already published a TX for this attempt, slash
// This shouldn't be reachable since nonces were made inserted by the coordinator, yet it's a
// cheap check to leave in for safety
if DataDb::get(txn, genesis, data_spec, &signed.signer.to_bytes()).is_some() {
fatal_slash::<D>(txn, genesis, signed.signer.to_bytes(), "published data multiple times");
return Accumulation::NotReady;
Expand Down Expand Up @@ -410,7 +411,6 @@ pub(crate) async fn handle_application_tx<
}
}

// TODO: Only accept one of either InvalidDkgShare/DkgConfirmed per signer
// TODO: Ban self-accusals
Transaction::InvalidDkgShare { attempt, accuser, faulty, blame, signed } => {
let range = spec.i(signed.signer).unwrap();
Expand Down Expand Up @@ -498,11 +498,6 @@ pub(crate) async fn handle_application_tx<
genesis,
Topic::SubstrateSign(SubstrateSignableId::CosigningSubstrateBlock(hash)),
);
NonceDecider::handle_substrate_signable(
txn,
genesis,
SubstrateSignableId::CosigningSubstrateBlock(hash),
);

let block_number = SeraiBlockNumber::get(txn, hash)
.expect("CosignSubstrateBlock yet didn't save Serai block number");
Expand All @@ -528,9 +523,7 @@ pub(crate) async fn handle_application_tx<
genesis,
Topic::SubstrateSign(SubstrateSignableId::Batch(batch)),
);
let nonce =
NonceDecider::handle_substrate_signable(txn, genesis, SubstrateSignableId::Batch(batch));
recognized_id(spec.set(), genesis, RecognizedIdType::Batch, batch.to_vec(), nonce).await;
recognized_id(spec.set(), genesis, RecognizedIdType::Batch, batch.to_vec()).await;
}

Transaction::SubstrateBlock(block) => {
Expand All @@ -539,10 +532,9 @@ pub(crate) async fn handle_application_tx<
despite us not providing that transaction",
);

let nonces = NonceDecider::handle_substrate_block(txn, genesis, &plan_ids);
for (nonce, id) in nonces.into_iter().zip(plan_ids.into_iter()) {
for id in plan_ids.into_iter() {
AttemptDb::recognize_topic(txn, genesis, Topic::Sign(id));
recognized_id(spec.set(), genesis, RecognizedIdType::Plan, id.to_vec(), nonce).await;
recognized_id(spec.set(), genesis, RecognizedIdType::Plan, id.to_vec()).await;
}
}

Expand All @@ -569,7 +561,6 @@ pub(crate) async fn handle_application_tx<
) {
Accumulation::Ready(DataSet::Participating(mut preprocesses)) => {
unflatten(spec, &mut preprocesses);
NonceDecider::selected_for_signing_substrate(txn, genesis, data.plan);
processors
.send(
spec.set().network,
Expand Down Expand Up @@ -645,7 +636,6 @@ pub(crate) async fn handle_application_tx<
) {
Accumulation::Ready(DataSet::Participating(mut preprocesses)) => {
unflatten(spec, &mut preprocesses);
NonceDecider::selected_for_signing_plan(txn, genesis, data.plan);
processors
.send(
spec.set().network,
Expand Down
Loading