Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/development' into feature/consen…
Browse files Browse the repository at this point in the history
…sus-consistent-abort-reason
  • Loading branch information
ksrichard committed Oct 14, 2024
2 parents 2a41305 + a0322f9 commit 6918164
Show file tree
Hide file tree
Showing 35 changed files with 775 additions and 236 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/audit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: rustsec/audit-check@v1.4.1
- uses: rustsec/audit-check@v2.0.0
with:
token: ${{ secrets.GITHUB_TOKEN }}
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions applications/tari_validator_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tari_state_store_sqlite = { workspace = true }
tari_networking = { workspace = true }
tari_rpc_framework = { workspace = true }
tari_template_builtin = { workspace = true }
tari_swarm = { workspace = true }

sqlite_message_logger = { workspace = true }

Expand Down
38 changes: 29 additions & 9 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fs, io, ops::Deref, str::FromStr};
use std::{collections::HashMap, fs, io, ops::Deref, str::FromStr};

use anyhow::{anyhow, Context};
use futures::{future, FutureExt};
Expand Down Expand Up @@ -106,8 +106,9 @@ use crate::{
p2p::{
create_tari_validator_node_rpc_service,
services::{
consensus_gossip::{self, ConsensusGossipHandle},
mempool::{self, MempoolHandle},
messaging::{ConsensusInboundMessaging, ConsensusOutboundMessaging, Gossip},
messaging::{ConsensusInboundMessaging, ConsensusOutboundMessaging},
},
NopLogger,
},
Expand Down Expand Up @@ -137,7 +138,14 @@ pub async fn spawn_services(

// Networking
let (tx_consensus_messages, rx_consensus_messages) = mpsc::unbounded_channel();
let (tx_gossip_messages, rx_gossip_messages) = mpsc::unbounded_channel();

// gossip channels
let (tx_transaction_gossip_messages, rx_transaction_gossip_messages) = mpsc::unbounded_channel();
let (tx_consensus_gossip_messages, rx_consensus_gossip_messages) = mpsc::unbounded_channel();
let mut tx_gossip_messages_by_topic = HashMap::new();
tx_gossip_messages_by_topic.insert(mempool::TOPIC_PREFIX.to_string(), tx_transaction_gossip_messages);
tx_gossip_messages_by_topic.insert(consensus_gossip::TOPIC_PREFIX.to_string(), tx_consensus_gossip_messages);

let identity = identity::Keypair::sr25519_from_bytes(keypair.secret_key().as_bytes().to_vec()).map_err(|e| {
ExitError::new(
ExitCode::ConfigError,
Expand All @@ -157,11 +165,12 @@ pub async fn spawn_services(
p.addresses.into_iter().map(move |a| (peer_id, a))
})
.collect();

let (mut networking, join_handle) = tari_networking::spawn(
identity,
MessagingMode::Enabled {
tx_messages: tx_consensus_messages,
tx_gossip_messages,
tx_gossip_messages_by_topic,
},
tari_networking::Config {
listener_port: config.validator_node.p2p.listener_port,
Expand Down Expand Up @@ -239,18 +248,28 @@ pub async fn spawn_services(
per_log_cost: 1,
};

// Consensus gossip
let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) =
consensus_gossip::spawn(epoch_manager.clone(), networking.clone(), rx_consensus_gossip_messages);
handles.push(join_handle);

// Messaging
let message_logger = NopLogger; // SqliteMessageLogger::new(config.validator_node.data_dir.join("message_log.sqlite"));
let local_address = PeerAddress::from(keypair.public_key().clone());
let (loopback_sender, loopback_receiver) = mpsc::unbounded_channel();
let inbound_messaging = ConsensusInboundMessaging::new(
local_address,
rx_consensus_messages,
rx_consensus_gossip_messages,
loopback_receiver,
message_logger.clone(),
);
let outbound_messaging =
ConsensusOutboundMessaging::new(loopback_sender, networking.clone(), message_logger.clone());
let outbound_messaging = ConsensusOutboundMessaging::new(
loopback_sender,
consensus_gossip_service.clone(),
networking.clone(),
message_logger.clone(),
);

// Consensus
let payload_processor = TariDanTransactionProcessor::new(config.network, template_manager.clone(), fee_table);
Expand Down Expand Up @@ -284,15 +303,14 @@ pub async fn spawn_services(
.await;
handles.push(consensus_join_handle);

let gossip = Gossip::new(networking.clone(), rx_gossip_messages);

let (mempool, join_handle) = mempool::spawn(
consensus_constants.num_preshards,
gossip,
epoch_manager.clone(),
create_mempool_transaction_validator(template_manager.clone()),
state_store.clone(),
consensus_handle.clone(),
networking.clone(),
rx_transaction_gossip_messages,
#[cfg(feature = "metrics")]
metrics_registry,
);
Expand Down Expand Up @@ -363,6 +381,7 @@ pub async fn spawn_services(
dry_run_transaction_processor,
handles,
validator_node_client_factory,
consensus_gossip_service,
})
}

Expand Down Expand Up @@ -414,6 +433,7 @@ pub struct Services {
pub global_db: GlobalDb<SqliteGlobalDbAdapter<PeerAddress>>,
pub dry_run_transaction_processor: DryRunTransactionProcessor,
pub validator_node_client_factory: TariValidatorNodeRpcClientFactory,
pub consensus_gossip_service: ConsensusGossipHandle,
pub state_store: SqliteStateStore<PeerAddress>,

pub handles: Vec<JoinHandle<Result<(), anyhow::Error>>>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_epoch_manager::EpochManagerError;
use tari_networking::NetworkingError;
use tokio::sync::{mpsc, oneshot};

use super::ConsensusGossipRequest;

#[derive(thiserror::Error, Debug)]
pub enum ConsensusGossipError {
#[error("Invalid message: {0}")]
InvalidMessage(#[from] anyhow::Error),
#[error("Epoch Manager Error: {0}")]
EpochManagerError(#[from] EpochManagerError),
#[error("Internal service request cancelled")]
RequestCancelled,
#[error("Network error: {0}")]
NetworkingError(#[from] NetworkingError),
}

impl From<mpsc::error::SendError<ConsensusGossipRequest>> for ConsensusGossipError {
fn from(_: mpsc::error::SendError<ConsensusGossipRequest>) -> Self {
Self::RequestCancelled
}
}

impl From<oneshot::error::RecvError> for ConsensusGossipError {
fn from(_: oneshot::error::RecvError) -> Self {
Self::RequestCancelled
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_consensus::messages::HotstuffMessage;
use tari_dan_common_types::ShardGroup;
use tokio::sync::{mpsc, oneshot};

use super::ConsensusGossipError;

pub enum ConsensusGossipRequest {
Multicast {
shard_group: ShardGroup,
message: HotstuffMessage,
reply: oneshot::Sender<Result<(), ConsensusGossipError>>,
},
GetLocalShardGroup {
reply: oneshot::Sender<Result<Option<ShardGroup>, ConsensusGossipError>>,
},
}

#[derive(Debug)]
pub struct ConsensusGossipHandle {
tx_consensus_request: mpsc::Sender<ConsensusGossipRequest>,
}

impl Clone for ConsensusGossipHandle {
fn clone(&self) -> Self {
ConsensusGossipHandle {
tx_consensus_request: self.tx_consensus_request.clone(),
}
}
}

impl ConsensusGossipHandle {
pub(super) fn new(tx_consensus_request: mpsc::Sender<ConsensusGossipRequest>) -> Self {
Self { tx_consensus_request }
}

pub async fn multicast(
&self,
shard_group: ShardGroup,
message: HotstuffMessage,
) -> Result<(), ConsensusGossipError> {
let (tx, rx) = oneshot::channel();
self.tx_consensus_request
.send(ConsensusGossipRequest::Multicast {
shard_group,
message,
reply: tx,
})
.await?;

rx.await?
}

pub async fn get_local_shard_group(&self) -> Result<Option<ShardGroup>, ConsensusGossipError> {
let (tx, rx) = oneshot::channel();
self.tx_consensus_request
.send(ConsensusGossipRequest::GetLocalShardGroup { reply: tx })
.await?;

rx.await?
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2024. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use libp2p::{gossipsub, PeerId};
use log::*;
use tari_dan_common_types::PeerAddress;
use tari_dan_p2p::{proto, TariMessagingSpec};
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_networking::NetworkingHandle;
use tokio::{sync::mpsc, task, task::JoinHandle};

use crate::p2p::services::consensus_gossip::{service::ConsensusGossipService, ConsensusGossipHandle};

const LOG_TARGET: &str = "tari::dan::validator_node::mempool";

pub fn spawn(
epoch_manager: EpochManagerHandle<PeerAddress>,
networking: NetworkingHandle<TariMessagingSpec>,
rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>,
) -> (
ConsensusGossipHandle,
JoinHandle<anyhow::Result<()>>,
mpsc::Receiver<(PeerId, proto::consensus::HotStuffMessage)>,
) {
let (tx_consensus_request, rx_consensus_request) = mpsc::channel(10);
let (tx_consensus_gossip, rx_consensus_gossip) = mpsc::channel(10);

let consensus_gossip = ConsensusGossipService::new(
rx_consensus_request,
epoch_manager,
networking,
rx_gossip,
tx_consensus_gossip,
);
let handle = ConsensusGossipHandle::new(tx_consensus_request);

let join_handle = task::spawn(consensus_gossip.run());
debug!(target: LOG_TARGET, "Spawning consensus gossip service (task: {:?})", join_handle);

(handle, join_handle, rx_consensus_gossip)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

mod error;
pub use error::*;

mod handle;
pub use handle::{ConsensusGossipHandle, ConsensusGossipRequest};

mod initializer;
pub use initializer::spawn;

mod service;
pub use service::TOPIC_PREFIX;
Loading

0 comments on commit 6918164

Please sign in to comment.