Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break things (part 1) #3033

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions crates/sp-consensus-subspace/src/inherents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ extern crate alloc;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use codec::{Decode, Encode};
use sp_consensus_slots::Slot;
use sp_inherents::{Error, InherentData, InherentIdentifier, IsFatalError};
use subspace_core_primitives::SegmentHeader;

Expand Down Expand Up @@ -53,9 +52,6 @@ impl IsFatalError for InherentError {
/// The type of the Subspace inherent data.
#[derive(Debug, Encode, Decode)]
pub struct InherentType {
/// Slot at which block was created.
// TODO: Remove slot when breaking protocol and probably change the whole data structure to an enum
slot: Slot,
/// Segment headers expected to be included in the block.
pub segment_headers: Vec<SegmentHeader>,
}
Expand Down Expand Up @@ -90,11 +86,7 @@ impl InherentDataProvider {
/// Create new inherent data provider from the given `segment_headers`.
pub fn new(segment_headers: Vec<SegmentHeader>) -> Self {
Self {
data: InherentType {
// TODO: Remove slot when breaking protocol
slot: Default::default(),
segment_headers,
},
data: InherentType { segment_headers },
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/subspace-core-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,6 @@ impl SegmentHeader {
/// Sector index in consensus
pub type SectorIndex = u16;

// TODO: Versioned solution enum
/// Farmer solution for slot challenge.
#[derive(Clone, Debug, Eq, PartialEq, Encode, Decode, TypeInfo)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{PosTable, PosTableLegacy};
use crate::PosTable;
use anyhow::anyhow;
use clap::{Parser, Subcommand};
use criterion::{black_box, BatchSize, Criterion, Throughput};
Expand Down Expand Up @@ -131,9 +131,6 @@ fn audit(audit_options: AuditOptions) -> anyhow::Result<()> {

match single_disk_farm_info {
SingleDiskFarmInfo::V0 { .. } => {
audit_inner::<PosTableLegacy>(audit_options, single_disk_farm_info)
}
SingleDiskFarmInfo::V1 { .. } => {
audit_inner::<PosTable>(audit_options, single_disk_farm_info)
}
}
Expand Down Expand Up @@ -314,9 +311,6 @@ fn prove(prove_options: ProveOptions) -> anyhow::Result<()> {

match single_disk_farm_info {
SingleDiskFarmInfo::V0 { .. } => {
prove_inner::<PosTableLegacy>(prove_options, single_disk_farm_info)
}
SingleDiskFarmInfo::V1 { .. } => {
prove_inner::<PosTable>(prove_options, single_disk_farm_info)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,8 @@ impl ClusterSubcommand {
}
}

pub(crate) async fn cluster<PosTableLegacy, PosTable>(
cluster_args: ClusterArgs,
) -> anyhow::Result<()>
pub(crate) async fn cluster<PosTable>(cluster_args: ClusterArgs) -> anyhow::Result<()>
where
PosTableLegacy: Table,
PosTable: Table,
{
let signal = shutdown_signal();
Expand Down Expand Up @@ -123,11 +120,10 @@ where
controller(nats_client, &mut registry, controller_args).await?
}
ClusterSubcommand::Farmer(farmer_args) => {
farmer::<PosTableLegacy, PosTable>(nats_client, &mut registry, farmer_args).await?
farmer::<PosTable>(nats_client, &mut registry, farmer_args).await?
}
ClusterSubcommand::Plotter(plotter_args) => {
plotter::<PosTableLegacy, PosTable>(nats_client, &mut registry, plotter_args)
.await?
plotter::<PosTable>(nats_client, &mut registry, plotter_args).await?
}
ClusterSubcommand::Cache(cache_args) => {
cache(nats_client, &mut registry, cache_args).await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,12 @@ pub(super) struct FarmerArgs {
pub(super) additional_components: Vec<String>,
}

pub(super) async fn farmer<PosTableLegacy, PosTable>(
pub(super) async fn farmer<PosTable>(
nats_client: NatsClient,
registry: &mut Registry,
farmer_args: FarmerArgs,
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>>
where
PosTableLegacy: Table,
PosTable: Table,
{
let FarmerArgs {
Expand Down Expand Up @@ -212,23 +211,13 @@ where
.unwrap_or_else(recommended_number_of_farming_threads);

let global_mutex = Arc::default();
let plotter_legacy = Arc::new(ClusterPlotter::new(
nats_client.clone(),
sector_encoding_concurrency,
ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
},
false,
));
let plotter = Arc::new(ClusterPlotter::new(
nats_client.clone(),
sector_encoding_concurrency,
ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
},
true,
));

let farms = {
Expand All @@ -248,7 +237,6 @@ where
let node_client = node_client.clone();
let kzg = kzg.clone();
let erasure_coding = erasure_coding.clone();
let plotter_legacy = Arc::clone(&plotter_legacy);
let plotter = Arc::clone(&plotter);
let global_mutex = Arc::clone(&global_mutex);
let faster_read_sector_record_chunks_mode_barrier =
Expand All @@ -257,15 +245,14 @@ where
Arc::clone(&faster_read_sector_record_chunks_mode_concurrency);

async move {
let farm_fut = SingleDiskFarm::new::<_, PosTableLegacy, PosTable>(
let farm_fut = SingleDiskFarm::new::<_, PosTable>(
SingleDiskFarmOptions {
directory: disk_farm.directory.clone(),
farmer_app_info,
allocated_space: disk_farm.allocated_space,
max_pieces_in_sector,
node_client,
reward_address,
plotter_legacy,
plotter,
kzg,
erasure_coding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::commands::shared::PlottingThreadPriority;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use clap::Parser;
use futures::{select, FutureExt};
use prometheus_client::registry::Registry;
use std::future::Future;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -122,13 +121,12 @@ pub(super) struct PlotterArgs {
pub(super) additional_components: Vec<String>,
}

pub(super) async fn plotter<PosTableLegacy, PosTable>(
pub(super) async fn plotter<PosTable>(
nats_client: NatsClient,
registry: &mut Registry,
plotter_args: PlotterArgs,
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>>
where
PosTableLegacy: Table,
PosTable: Table,
{
let PlotterArgs {
Expand All @@ -149,8 +147,7 @@ where

let global_mutex = Arc::default();

let mut legacy_plotters = Vec::<Box<dyn Plotter + Send + Sync>>::new();
let mut modern_plotters = Vec::<Box<dyn Plotter + Send + Sync>>::new();
let mut plotters = Vec::<Box<dyn Plotter + Send + Sync>>::new();

#[cfg(feature = "cuda")]
{
Expand All @@ -164,12 +161,12 @@ where
)?;

if let Some(cuda_plotter) = maybe_cuda_plotter {
modern_plotters.push(Box::new(cuda_plotter));
plotters.push(Box::new(cuda_plotter));
}
}
{
let cpu_sector_encoding_concurrency = cpu_plotting_options.cpu_sector_encoding_concurrency;
let maybe_cpu_plotters = init_cpu_plotters::<_, PosTableLegacy, PosTable>(
let maybe_cpu_plotter = init_cpu_plotter::<_, PosTable>(
cpu_plotting_options,
piece_getter,
global_mutex,
Expand All @@ -178,45 +175,34 @@ where
registry,
)?;

if let Some((legacy_cpu_plotter, modern_cpu_plotter)) = maybe_cpu_plotters {
legacy_plotters.push(Box::new(legacy_cpu_plotter));
if !modern_plotters.is_empty() && cpu_sector_encoding_concurrency.is_none() {
info!(
"CPU plotting for v1 farms was disabled due to detected faster plotting with \
GPU"
);
if let Some(cpu_plotter) = maybe_cpu_plotter {
if !plotters.is_empty() && cpu_sector_encoding_concurrency.is_none() {
info!("CPU plotting was disabled due to detected faster plotting with GPU");
} else {
modern_plotters.push(Box::new(modern_cpu_plotter));
plotters.push(Box::new(cpu_plotter));
}
}
}
let legacy_plotter = Arc::new(PoolPlotter::new(legacy_plotters, PLOTTING_RETRY_INTERVAL));
let modern_plotter = Arc::new(PoolPlotter::new(modern_plotters, PLOTTING_RETRY_INTERVAL));
let plotter = Arc::new(PoolPlotter::new(plotters, PLOTTING_RETRY_INTERVAL));

Ok(Box::pin(async move {
select! {
result = plotter_service(&nats_client, &legacy_plotter, false).fuse() => {
result.map_err(|error| anyhow!("Plotter service failed: {error}"))
}
result = plotter_service(&nats_client, &modern_plotter, true).fuse() => {
result.map_err(|error| anyhow!("Plotter service failed: {error}"))
}
}
plotter_service(&nats_client, &plotter)
.await
.map_err(|error| anyhow!("Plotter service failed: {error}"))
}))
}

#[allow(clippy::type_complexity)]
fn init_cpu_plotters<PG, PosTableLegacy, PosTable>(
fn init_cpu_plotter<PG, PosTable>(
cpu_plotting_options: CpuPlottingOptions,
piece_getter: PG,
global_mutex: Arc<AsyncMutex<()>>,
kzg: Kzg,
erasure_coding: ErasureCoding,
registry: &mut Registry,
) -> anyhow::Result<Option<(CpuPlotter<PG, PosTableLegacy>, CpuPlotter<PG, PosTable>)>>
) -> anyhow::Result<Option<CpuPlotter<PG, PosTable>>>
where
PG: PieceGetter + Clone + Send + Sync + 'static,
PosTableLegacy: Table,
PosTable: Table,
{
let CpuPlottingOptions {
Expand Down Expand Up @@ -295,17 +281,7 @@ where
)
.map_err(|error| anyhow!("Failed to create thread pool manager: {error}"))?;

let legacy_cpu_plotter = CpuPlotter::<_, PosTableLegacy>::new(
piece_getter.clone(),
Arc::clone(&downloading_semaphore),
plotting_thread_pool_manager.clone(),
cpu_record_encoding_concurrency,
Arc::clone(&global_mutex),
kzg.clone(),
erasure_coding.clone(),
Some(registry),
);
let modern_cpu_plotter = CpuPlotter::<_, PosTable>::new(
let cpu_plotter = CpuPlotter::<_, PosTable>::new(
piece_getter,
downloading_semaphore,
plotting_thread_pool_manager,
Expand All @@ -316,7 +292,7 @@ where
Some(registry),
);

Ok(Some((legacy_cpu_plotter, modern_cpu_plotter)))
Ok(Some(cpu_plotter))
}

#[cfg(feature = "cuda")]
Expand Down
Loading
Loading