Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Remove deficit gossipsub scoring during topic transition #4486

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9a20b0d
Remove topic weight from old fork topics
ackintosh Jul 8, 2023
9aff368
cargo fmt
ackintosh Jul 9, 2023
c2f002d
Make clippy happy
ackintosh Jul 9, 2023
e82e8b4
WIP: unit test
ackintosh Jul 11, 2023
c853427
Unit test using logs because there's no way to check PeerScoreParams …
ackintosh Jul 17, 2023
1009d88
Manually advance slots instead of sleeping to improve testing time
ackintosh Jul 20, 2023
575917c
Merge remote-tracking branch 'upstream/unstable' into remove-deficit-…
ackintosh Jul 20, 2023
4e97ec7
Fix return types
ackintosh Jul 21, 2023
5623e45
Fix clippy warning
ackintosh Jul 21, 2023
152d1f0
WIP: There is no way to test topic_weight since libp2p-gossipsub has …
ackintosh Jul 22, 2023
5589e87
Fix unexpected change
ackintosh Jul 22, 2023
c488b10
get_topic_params returns a reference to TopicScoreParams
ackintosh Aug 1, 2023
c5e6168
Merge branch 'unstable' into remove-deficit-gossipsub-scoring
ackintosh Aug 28, 2023
2e26f77
Check that topic_weight on the old topics has been zeroed
ackintosh Aug 28, 2023
75ac306
Update how to create beacon processor channels according to the lates…
ackintosh Aug 28, 2023
b144459
Compute subnets and add assertions
ackintosh Sep 1, 2023
1180022
Tweak port
ackintosh Sep 1, 2023
09d08c7
Allow dead code
ackintosh Sep 3, 2023
71a219d
Update beacon_node/lighthouse_network/src/service/mod.rs
ackintosh Sep 12, 2023
3c3257e
Update beacon_node/lighthouse_network/src/service/mod.rs
ackintosh Sep 12, 2023
567f969
Move NetworkService::get_topic_params to tests module since it's used…
ackintosh Sep 12, 2023
3636bb1
Merge branch 'unstable' into remove-deficit-gossipsub-scoring
ackintosh Sep 27, 2023
aa4acf5
Update port setting according to the QUIC support
ackintosh Sep 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettin
use libp2p::bandwidth::BandwidthSinks;
use libp2p::gossipsub::{
self, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
TopicScoreParams,
};
use libp2p::identify;
use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol};
Expand Down Expand Up @@ -621,6 +622,38 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
}
}

/// Remove topic weight from all topics that don't have the given fork digest.
pub fn remove_topic_weight_except(&mut self, except: [u8; 4]) {
let new_param = TopicScoreParams {
topic_weight: 0.0,
..Default::default()
};
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
for topic in subscriptions
.iter()
.filter(|topic| topic.fork_digest != except)
{
let libp2p_topic: Topic = topic.clone().into();
match self
.gossipsub_mut()
.set_topic_params(libp2p_topic, new_param.clone())
{
Ok(_) => debug!(self.log, "Removed topic weight"; "topic" => %topic),
Err(e) => {
warn!(self.log, "Failed to remove topic weight"; "topic" => %topic, "error" => e)
}
}
}
}

/// Returns the scoring parameters for a topic if set.
pub fn get_topic_params(&self, topic: GossipTopic) -> Option<&TopicScoreParams> {
self.swarm
.behaviour()
.gossipsub
.get_topic_params(&topic.into())
}

/// Subscribes to a gossipsub topic.
///
/// Returns `true` if the subscription was successful and `false` otherwise.
Expand Down
38 changes: 34 additions & 4 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,18 @@ pub struct NetworkService<T: BeaconChainTypes> {
}

impl<T: BeaconChainTypes> NetworkService<T> {
#[allow(clippy::type_complexity)]
pub async fn start(
async fn build(
beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig,
executor: task_executor::TaskExecutor,
gossipsub_registry: Option<&'_ mut Registry>,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
) -> error::Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>)> {
) -> error::Result<(
NetworkService<T>,
Arc<NetworkGlobals<T::EthSpec>>,
NetworkSenders<T::EthSpec>,
)> {
let network_log = executor.log().clone();
// build the channels for external comms
let (network_senders, network_recievers) = NetworkSenders::new();
Expand Down Expand Up @@ -369,6 +372,28 @@ impl<T: BeaconChainTypes> NetworkService<T> {
enable_light_client_server: config.enable_light_client_server,
};

Ok((network_service, network_globals, network_senders))
}

#[allow(clippy::type_complexity)]
pub async fn start(
beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig,
executor: task_executor::TaskExecutor,
gossipsub_registry: Option<&'_ mut Registry>,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
) -> error::Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>)> {
let (network_service, network_globals, network_senders) = Self::build(
beacon_chain,
config,
executor.clone(),
gossipsub_registry,
beacon_processor_send,
beacon_processor_reprocess_tx,
)
.await?;

network_service.spawn_service(executor);

Ok((network_globals, network_senders))
Expand Down Expand Up @@ -882,9 +907,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {

fn update_next_fork(&mut self) {
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
let new_fork_digest = new_enr_fork_id.fork_digest;

let fork_context = &self.fork_context;
if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) {
if let Some(new_fork_name) = fork_context.from_context_bytes(new_fork_digest) {
info!(
self.log,
"Transitioned to new fork";
Expand All @@ -907,6 +933,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
Box::pin(next_fork_subscriptions_delay(&self.beacon_chain).into());
self.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into());
info!(self.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS);

// Remove topic weight from old fork topics to prevent peers that left on the mesh on
// old topics from being penalized for not sending us messages.
self.libp2p.remove_topic_weight_except(new_fork_digest);
} else {
crit!(self.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id);
}
Expand Down
140 changes: 137 additions & 3 deletions beacon_node/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,26 @@ mod tests {
use crate::persisted_dht::load_dht;
use crate::{NetworkConfig, NetworkService};
use beacon_chain::test_utils::BeaconChainHarness;
use beacon_processor::BeaconProcessorChannels;
use lighthouse_network::Enr;
use beacon_chain::BeaconChainTypes;
use beacon_processor::{BeaconProcessorChannels, BeaconProcessorConfig};
use futures::StreamExt;
use lighthouse_network::types::{GossipEncoding, GossipKind};
use lighthouse_network::{Enr, GossipTopic};
use slog::{o, Drain, Level, Logger};
use sloggers::{null::NullLoggerBuilder, Build};
use std::str::FromStr;
use std::sync::Arc;
use tokio::runtime::Runtime;
use types::MinimalEthSpec;
use types::{Epoch, EthSpec, ForkName, MinimalEthSpec, SubnetId};

impl<T: BeaconChainTypes> NetworkService<T> {
fn get_topic_params(
&self,
topic: GossipTopic,
) -> Option<&lighthouse_network::libp2p::gossipsub::TopicScoreParams> {
self.libp2p.get_topic_params(topic)
}
}

fn get_logger(actual_log: bool) -> Logger {
if actual_log {
Expand Down Expand Up @@ -102,4 +114,126 @@ mod tests {
"should have persisted the second ENR to store"
);
}

// Test removing topic weight on old topics when a fork happens.
#[test]
fn test_removing_topic_weight_on_old_topics() {
let runtime = Arc::new(Runtime::new().unwrap());

// Capella spec
let mut spec = MinimalEthSpec::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(0));
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
spec.capella_fork_epoch = Some(Epoch::new(1));

// Build beacon chain.
let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec)
.spec(spec.clone())
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build()
.chain;
let (next_fork_name, _) = beacon_chain.duration_to_next_fork().expect("next fork");
assert_eq!(next_fork_name, ForkName::Capella);

// Build network service.
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
let (_, exit) = exit_future::signal();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
Arc::downgrade(&runtime),
exit,
get_logger(false),
shutdown_tx,
);

let mut config = NetworkConfig::default();
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21214, 21214, 21215);
config.discv5_config.table_filter = |_| true; // Do not ignore local IPs
config.upnp_enabled = false;

let beacon_processor_channels =
BeaconProcessorChannels::new(&BeaconProcessorConfig::default());
NetworkService::build(
beacon_chain.clone(),
&config,
executor.clone(),
None,
beacon_processor_channels.beacon_processor_tx,
beacon_processor_channels.work_reprocessing_tx,
)
.await
.unwrap()
});

// Subscribe to the topics.
runtime.block_on(async {
while network_globals.gossipsub_subscriptions.read().len() < 2 {
if let Some(msg) = network_service.attestation_service.next().await {
network_service.on_attestation_service_msg(msg);
}
}
});

// Make sure the service is subscribed to the topics.
let (old_topic1, old_topic2) = {
let mut subnets = SubnetId::compute_subnets_for_epoch::<MinimalEthSpec>(
network_globals.local_enr().node_id().raw().into(),
beacon_chain.epoch().unwrap(),
&spec,
)
.unwrap()
.0
.collect::<Vec<_>>();
assert_eq!(2, subnets.len());

let old_fork_digest = beacon_chain.enr_fork_id().fork_digest;
let old_topic1 = GossipTopic::new(
GossipKind::Attestation(subnets.pop().unwrap()),
GossipEncoding::SSZSnappy,
old_fork_digest,
);
let old_topic2 = GossipTopic::new(
GossipKind::Attestation(subnets.pop().unwrap()),
GossipEncoding::SSZSnappy,
old_fork_digest,
);

(old_topic1, old_topic2)
};
let subscriptions = network_globals.gossipsub_subscriptions.read().clone();
assert_eq!(2, subscriptions.len());
assert!(subscriptions.contains(&old_topic1));
assert!(subscriptions.contains(&old_topic2));
let old_topic_params1 = network_service
.get_topic_params(old_topic1.clone())
.expect("topic score params");
assert!(old_topic_params1.topic_weight > 0.0);
let old_topic_params2 = network_service
.get_topic_params(old_topic2.clone())
.expect("topic score params");
assert!(old_topic_params2.topic_weight > 0.0);

// Advance slot to the next fork
for _ in 0..MinimalEthSpec::slots_per_epoch() {
beacon_chain.slot_clock.advance_slot();
}

// Run `NetworkService::update_next_fork()`.
runtime.block_on(async {
network_service.update_next_fork();
});

// Check that topic_weight on the old topics has been zeroed.
let old_topic_params1 = network_service
.get_topic_params(old_topic1)
.expect("topic score params");
assert_eq!(0.0, old_topic_params1.topic_weight);

let old_topic_params2 = network_service
.get_topic_params(old_topic2)
.expect("topic score params");
assert_eq!(0.0, old_topic_params2.topic_weight);
}
}