Skip to content

Commit

Permalink
fix: update channel sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
gruberb committed Apr 24, 2024
1 parent d130dae commit 922e26d
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 14 deletions.
2 changes: 1 addition & 1 deletion crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Behaviour {
.unwrap_or(Ok(MAX_BATCH_SIZE))
.unwrap();
let gossipsub = gossipsub::ConfigBuilder::default()
.max_transmit_size(20 * 1024 * 1024)
.max_transmit_size(20 * 2048 * 2048)
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(|msg_id| {
// Content based id
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-p2p/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ lazy_static! {
pub static ref EVENT_STREAM_BUFFER: usize = env::var("TCE_EVENT_STREAM_BUFFER")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(1024 * 20);
.unwrap_or(2048 * 2);
pub static ref CAPACITY_EVENT_STREAM_BUFFER: usize = EVENT_STREAM_BUFFER
.checked_mul(1_000)
.checked_mul(10)
.map(|v| {
let r: usize = v.checked_div(100).unwrap_or(*EVENT_STREAM_BUFFER);
r
Expand All @@ -23,7 +23,7 @@ lazy_static! {
pub static ref COMMAND_STREAM_BUFFER_SIZE: usize = env::var("TCE_COMMAND_STREAM_BUFFER_SIZE")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(1024 * 20);
.unwrap_or(2048);
}

pub const DISCOVERY_PROTOCOL: &str = "/tce-disco/1";
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-api/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Runtime {
}

pub async fn launch(mut self) {
let mut health_update = tokio::time::interval(Duration::from_secs(1));
let mut health_update = tokio::time::interval(Duration::from_secs(10));
let shutdowned: Option<oneshot::Sender<()>> = loop {
tokio::select! {
shutdown = self.shutdown.recv() => {
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-tce-broadcast/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ lazy_static! {
std::env::var("TOPOS_DOUBLE_ECHO_COMMAND_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
.unwrap_or(1024 * 20);
/// Size of the channel between double echo and the task manager
pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize =
std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(20_480);
.unwrap_or(1024 * 20);
/// Size of the channel to send protocol events from the double echo
pub static ref PROTOCOL_CHANNEL_SIZE: usize =
std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
.unwrap_or(1024 * 20);
/// Capacity alert threshold for the double echo command channel
pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE
.checked_mul(10)
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl TaskManager {
}

pub async fn run(mut self, shutdown_receiver: CancellationToken) {
let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(200));
let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(3));

loop {
tokio::select! {
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-tce-proxy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use topos_core::{
use tracing::{debug, error, info, info_span, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 1024 * 10;
const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 1024 * 10;
const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 1024 * 10;
const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 1024 * 5;
const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 1024 * 5;
const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 1024 * 5;

// Maximum backoff retry timeout in seconds (1 hour)
const TCE_SUBMIT_CERTIFICATE_BACKOFF_TIMEOUT: Duration = Duration::from_secs(3600);
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-proxy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct TceProxyWorker {
}

impl TceProxyWorker {
/// Construct a new [`TceProxyWorker`] with a 128 items deep channel to send commands to and receive events from a TCE node on the given subnet.
/// Construct a new [`TceProxyWorker`] with a 1024 * 20 items deep channel to send commands to and receive events from a TCE node on the given subnet.
/// The worker holds a [`crate::client::TceClient`]
pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> {
let (command_sender, mut command_rcv) = mpsc::channel::<TceProxyCommand>(1024 * 20);
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-synchronizer/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Default for SynchronizerBuilder {
network_client: None,
store: None,
config: SynchronizationConfig::default(),
event_channel_size: 1024 * 20,
event_channel_size: 1024,
shutdown: None,
}
}
Expand Down

0 comments on commit 922e26d

Please sign in to comment.