diff --git a/Cargo.lock b/Cargo.lock index adf3cadc3c3..234dd9c2c10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10129,6 +10129,7 @@ dependencies = [ "sp-runtime", "subspace-archiving", "subspace-core-primitives", + "subspace-erasure-coding", "subspace-proof-of-space", "subspace-verification", "thiserror", @@ -10161,6 +10162,7 @@ dependencies = [ "sp-runtime", "subspace-archiving", "subspace-core-primitives", + "subspace-erasure-coding", "subspace-farmer-components", "subspace-networking", "subspace-rpc-primitives", @@ -13013,6 +13015,7 @@ dependencies = [ "parking_lot 0.12.3", "prometheus-client 0.22.3", "prost 0.12.6", + "rayon", "sc-basic-authorship", "sc-chain-spec", "sc-client-api", @@ -13066,6 +13069,7 @@ dependencies = [ "static_assertions", "subspace-archiving", "subspace-core-primitives", + "subspace-erasure-coding", "subspace-networking", "subspace-proof-of-space", "subspace-runtime-primitives", diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 8d5c119c5c0..d69c197eadd 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -370,7 +370,7 @@ pub fn create_archived_segment() -> &'static NewArchivedSegment { static ARCHIVED_SEGMENT: OnceLock = OnceLock::new(); ARCHIVED_SEGMENT.get_or_init(|| { - let mut archiver = Archiver::new(kzg_instance().clone()).unwrap(); + let mut archiver = Archiver::new(kzg_instance().clone(), erasure_coding_instance().clone()); let mut block = vec![0u8; RecordedHistorySegment::SIZE]; rand::thread_rng().fill(block.as_mut_slice()); diff --git a/crates/sc-consensus-subspace-rpc/Cargo.toml b/crates/sc-consensus-subspace-rpc/Cargo.toml index 2259e435537..9ff6acf4e86 100644 --- a/crates/sc-consensus-subspace-rpc/Cargo.toml +++ b/crates/sc-consensus-subspace-rpc/Cargo.toml @@ -34,6 +34,7 @@ sp-objects = { version = "0.1.0", path = "../sp-objects" } sp-runtime = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" } subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } +subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } subspace-farmer-components = { version = "0.1.0", path = "../subspace-farmer-components" } subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" } diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index 6da66a22d8b..2026f5ad2b8 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -62,6 +62,7 @@ use subspace_core_primitives::{ BlockHash, HistorySize, Piece, PieceIndex, PublicKey, SegmentHeader, SegmentIndex, SlotNumber, Solution, }; +use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::FarmerProtocolInfo; use subspace_networking::libp2p::Multiaddr; use subspace_rpc_primitives::{ @@ -215,6 +216,8 @@ where pub deny_unsafe: DenyUnsafe, /// Kzg instance pub kzg: Kzg, + /// Erasure coding instance + pub erasure_coding: ErasureCoding, } /// Implements the [`SubspaceRpcApiServer`] trait for interacting with Subspace. @@ -243,6 +246,7 @@ where chain_constants: ChainConstants, max_pieces_in_sector: u16, kzg: Kzg, + erasure_coding: ErasureCoding, deny_unsafe: DenyUnsafe, _block: PhantomData, } @@ -300,6 +304,7 @@ where chain_constants, max_pieces_in_sector, kzg: config.kzg, + erasure_coding: config.erasure_coding, deny_unsafe: config.deny_unsafe, _block: PhantomData, }) @@ -717,7 +722,11 @@ where debug!(%requested_piece_index, "Re-creating genesis segment on demand"); // Try to re-create genesis segment on demand - match recreate_genesis_segment(&*self.client, self.kzg.clone()) { + match recreate_genesis_segment( + &*self.client, + self.kzg.clone(), + self.erasure_coding.clone(), + ) { Ok(Some(archived_segment)) => { let archived_segment = Arc::new(archived_segment); cached_archived_segment.replace(CachedArchivedSegment::Genesis( diff --git a/crates/sc-consensus-subspace/Cargo.toml b/crates/sc-consensus-subspace/Cargo.toml index 6cc2af46f29..059c11badf4 100644 --- a/crates/sc-consensus-subspace/Cargo.toml +++ b/crates/sc-consensus-subspace/Cargo.toml @@ -41,6 +41,7 @@ sp-objects = { version = "0.1.0", path = "../sp-objects" } sp-runtime = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" } subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } +subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space" } subspace-verification = { version = "0.1.0", path = "../subspace-verification" } thiserror = "1.0.63" diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 8b9935610de..56882eb2fd9 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -79,6 +79,7 @@ use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::objects::BlockObjectMapping; use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex}; +use subspace_erasure_coding::ErasureCoding; use tracing::{debug, info, trace, warn}; /// Number of WASM instances is 8, this is a bit lower to avoid warnings exceeding number of @@ -412,6 +413,7 @@ where pub fn recreate_genesis_segment( client: &Client, kzg: Kzg, + erasure_coding: ErasureCoding, ) -> Result, Box> where Block: BlockT, @@ -437,7 +439,7 @@ where let encoded_block = encode_block(signed_block); - let new_archived_segment = Archiver::new(kzg)? + let new_archived_segment = Archiver::new(kzg, erasure_coding) .add_block(encoded_block, block_object_mappings, false) .into_iter() .next() @@ -581,6 +583,7 @@ where let archiver = Archiver::with_initial_state( subspace_link.kzg().clone(), + subspace_link.erasure_coding().clone(), last_segment_header, &last_archived_block_encoded, block_object_mappings, @@ -591,7 +594,10 @@ where } else { info!("Starting archiving from genesis"); - Archiver::new(subspace_link.kzg().clone()).expect("Incorrect parameters for archiver") + Archiver::new( + subspace_link.kzg().clone(), + subspace_link.erasure_coding().clone(), + ) }; // Process blocks since last fully archived block up to the current head minus K diff --git a/crates/sc-consensus-subspace/src/lib.rs b/crates/sc-consensus-subspace/src/lib.rs index 81f3a41bc17..01bca2c342e 100644 --- a/crates/sc-consensus-subspace/src/lib.rs +++ b/crates/sc-consensus-subspace/src/lib.rs @@ -43,6 +43,7 @@ use crate::slot_worker::{NewSlotNotification, RewardSigningNotification}; use sp_consensus_subspace::ChainConstants; use sp_runtime::traits::Block as BlockT; use subspace_core_primitives::crypto::kzg::Kzg; +use subspace_erasure_coding::ErasureCoding; /// State that must be shared between various consensus components. #[derive(Clone)] @@ -59,11 +60,12 @@ pub struct SubspaceLink { SubspaceNotificationStream>, chain_constants: ChainConstants, kzg: Kzg, + erasure_coding: ErasureCoding, } impl SubspaceLink { /// Create new instance. - pub fn new(chain_constants: ChainConstants, kzg: Kzg) -> Self { + pub fn new(chain_constants: ChainConstants, kzg: Kzg, erasure_coding: ErasureCoding) -> Self { let (new_slot_notification_sender, new_slot_notification_stream) = notification::channel("subspace_new_slot_notification_stream"); let (reward_signing_notification_sender, reward_signing_notification_stream) = @@ -84,6 +86,7 @@ impl SubspaceLink { block_importing_notification_stream, chain_constants, kzg, + erasure_coding, } } @@ -127,4 +130,9 @@ impl SubspaceLink { pub fn kzg(&self) -> &Kzg { &self.kzg } + + /// Access erasure coding instance + pub fn erasure_coding(&self) -> &ErasureCoding { + &self.erasure_coding + } } diff --git a/crates/subspace-archiving/benches/archiving.rs b/crates/subspace-archiving/benches/archiving.rs index 0cd77d8c7de..c6491d867d6 100644 --- a/crates/subspace-archiving/benches/archiving.rs +++ b/crates/subspace-archiving/benches/archiving.rs @@ -1,8 +1,11 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use rand::{thread_rng, Rng}; +use std::num::NonZeroUsize; use subspace_archiving::archiver::Archiver; use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::Kzg; +use subspace_core_primitives::Record; +use subspace_erasure_coding::ErasureCoding; const AMOUNT_OF_DATA: usize = 5 * 1024 * 1024; const SMALL_BLOCK_SIZE: usize = 500; @@ -11,7 +14,12 @@ fn criterion_benchmark(c: &mut Criterion) { let mut input = vec![0u8; AMOUNT_OF_DATA]; thread_rng().fill(input.as_mut_slice()); let kzg = Kzg::new(kzg::embedded_kzg_settings()); - let archiver = Archiver::new(kzg).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let archiver = Archiver::new(kzg, erasure_coding); c.bench_function("segment-archiving-large-block", |b| { b.iter(|| { diff --git a/crates/subspace-archiving/src/archiver.rs b/crates/subspace-archiving/src/archiver.rs index 4144bc2fe0a..ed8ecc98df5 100644 --- a/crates/subspace-archiving/src/archiver.rs +++ b/crates/subspace-archiving/src/archiver.rs @@ -28,7 +28,6 @@ use alloc::vec; #[cfg(not(feature = "std"))] use alloc::vec::Vec; use core::cmp::Ordering; -use core::num::NonZeroUsize; use parity_scale_codec::{Compact, CompactLen, Decode, Encode, Input, Output}; #[cfg(feature = "parallel")] use rayon::prelude::*; @@ -189,12 +188,6 @@ pub struct NewArchivedSegment { #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] #[cfg_attr(feature = "thiserror", derive(thiserror::Error))] pub enum ArchiverInstantiationError { - /// Failed to initialize erasure coding - #[cfg_attr( - feature = "thiserror", - error("Failed to initialize erasure coding: {0}") - )] - FailedToInitializeErasureCoding(String), /// Invalid last archived block, its size is the same as encoded block #[cfg_attr( feature = "thiserror", @@ -245,22 +238,9 @@ pub struct Archiver { } impl Archiver { - // TODO: Make erasure coding an explicit argument - /// Create a new instance with specified record size and recorded history segment size. - /// - /// Note: this is the only way to instantiate object archiver, while block archiver can be - /// instantiated with `BlockArchiver::with_initial_state()` in case of restarts. - pub fn new(kzg: Kzg) -> Result { - // TODO: Check if KZG can process number configured number of elements and update proof - // message in `.expect()` - - let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(ArchivedHistorySegment::NUM_PIECES.ilog2() as usize) - .expect("Archived history segment contains at very least one piece; qed"), - ) - .map_err(ArchiverInstantiationError::FailedToInitializeErasureCoding)?; - - Ok(Self { + /// Create a new instance + pub fn new(kzg: Kzg, erasure_coding: ErasureCoding) -> Self { + Self { buffer: VecDeque::default(), incremental_record_commitments: IncrementalRecordCommitmentsState::with_capacity( RecordedHistorySegment::NUM_RAW_RECORDS, @@ -270,7 +250,7 @@ impl Archiver { segment_index: SegmentIndex::ZERO, prev_segment_header_hash: Blake3Hash::default(), last_archived_block: INITIAL_LAST_ARCHIVED_BLOCK, - }) + } } /// Create a new instance of the archiver with initial state in case of restart. @@ -278,11 +258,12 @@ impl Archiver { /// `block` corresponds to `last_archived_block` and will be processed accordingly to its state. pub fn with_initial_state( kzg: Kzg, + erasure_coding: ErasureCoding, segment_header: SegmentHeader, encoded_block: &[u8], mut object_mapping: BlockObjectMapping, ) -> Result { - let mut archiver = Self::new(kzg)?; + let mut archiver = Self::new(kzg, erasure_coding); archiver.segment_index = segment_header.segment_index() + SegmentIndex::ONE; archiver.prev_segment_header_hash = segment_header.hash(); diff --git a/crates/subspace-archiving/src/piece_reconstructor.rs b/crates/subspace-archiving/src/piece_reconstructor.rs index 3e9613970a9..80853d4e4aa 100644 --- a/crates/subspace-archiving/src/piece_reconstructor.rs +++ b/crates/subspace-archiving/src/piece_reconstructor.rs @@ -5,7 +5,6 @@ extern crate alloc; use alloc::string::String; #[cfg(not(feature = "std"))] use alloc::vec::Vec; -use core::num::NonZeroUsize; #[cfg(feature = "parallel")] use rayon::prelude::*; use subspace_core_primitives::crypto::kzg::{Commitment, Kzg, Polynomial}; @@ -13,18 +12,6 @@ use subspace_core_primitives::crypto::{blake3_254_hash_to_scalar, Scalar}; use subspace_core_primitives::{ArchivedHistorySegment, Piece, RawRecord}; use subspace_erasure_coding::ErasureCoding; -/// Reconstructor-related instantiation error. -#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -#[cfg_attr(feature = "thiserror", derive(thiserror::Error))] -pub enum ReconstructorInstantiationError { - /// Failed to initialize erasure coding - #[cfg_attr( - feature = "thiserror", - error("Failed to initialize erasure coding: {0}") - )] - FailedToInitializeErasureCoding(String), -} - /// Reconstructor-related instantiation error #[derive(Debug, Clone, PartialEq)] #[cfg_attr(feature = "thiserror", derive(thiserror::Error))] @@ -55,21 +42,12 @@ pub struct PiecesReconstructor { } impl PiecesReconstructor { - // TODO: Make erasure coding an explicit argument - pub fn new(kzg: Kzg) -> Result { - // TODO: Check if KZG can process number configured number of elements and update proof - // message in `.expect()` - - let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(ArchivedHistorySegment::NUM_PIECES.ilog2() as usize) - .expect("Archived history segment contains at very least one piece; qed"), - ) - .map_err(ReconstructorInstantiationError::FailedToInitializeErasureCoding)?; - - Ok(Self { + /// Create a new instance + pub fn new(kzg: Kzg, erasure_coding: ErasureCoding) -> Self { + Self { erasure_coding, kzg, - }) + } } /// Returns incomplete pieces (witness missing) and polynomial that can be used to generate diff --git a/crates/subspace-archiving/src/reconstructor.rs b/crates/subspace-archiving/src/reconstructor.rs index 0e4611754ed..d07693b7ffc 100644 --- a/crates/subspace-archiving/src/reconstructor.rs +++ b/crates/subspace-archiving/src/reconstructor.rs @@ -7,7 +7,6 @@ use alloc::string::String; #[cfg(not(feature = "std"))] use alloc::vec::Vec; use core::mem; -use core::num::NonZeroUsize; use parity_scale_codec::Decode; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{ @@ -16,18 +15,6 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; -/// Reconstructor-related instantiation error. -#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -#[cfg_attr(feature = "thiserror", derive(thiserror::Error))] -pub enum ReconstructorInstantiationError { - /// Failed to initialize erasure coding - #[cfg_attr( - feature = "thiserror", - error("Failed to initialize erasure coding: {0}") - )] - FailedToInitializeErasureCoding(String), -} - /// Reconstructor-related instantiation error #[derive(Debug, Clone, PartialEq)] #[cfg_attr(feature = "thiserror", derive(thiserror::Error))] @@ -74,22 +61,13 @@ pub struct Reconstructor { } impl Reconstructor { - // TODO: Make erasure coding an explicit argument - pub fn new() -> Result { - // TODO: Check if KZG can process number configured number of elements and update proof - // message in `.expect()` - - let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(ArchivedHistorySegment::NUM_PIECES.ilog2() as usize) - .expect("Archived history segment contains at very least one piece; qed"), - ) - .map_err(ReconstructorInstantiationError::FailedToInitializeErasureCoding)?; - - Ok(Self { + /// Create a new instance + pub fn new(erasure_coding: ErasureCoding) -> Self { + Self { erasure_coding, last_segment_index: None, partial_block: None, - }) + } } /// Given a set of pieces of a segment of the archived history (any half of all pieces are diff --git a/crates/subspace-archiving/tests/integration/archiver.rs b/crates/subspace-archiving/tests/integration/archiver.rs index 5ff8c700d4a..2d0808c2b6d 100644 --- a/crates/subspace-archiving/tests/integration/archiver.rs +++ b/crates/subspace-archiving/tests/integration/archiver.rs @@ -5,6 +5,7 @@ use rayon::prelude::*; use std::assert_matches::assert_matches; use std::io::Write; use std::iter; +use std::num::NonZeroUsize; use subspace_archiving::archiver; use subspace_archiving::archiver::{Archiver, ArchiverInstantiationError, SegmentItem}; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; @@ -14,6 +15,7 @@ use subspace_core_primitives::{ ArchivedBlockProgress, ArchivedHistorySegment, Blake3Hash, LastArchivedBlock, Piece, Record, RecordedHistorySegment, SegmentCommitment, SegmentHeader, SegmentIndex, }; +use subspace_erasure_coding::ErasureCoding; fn extract_data>(data: &[u8], offset: O) -> &[u8] { let offset: u64 = offset.into(); @@ -64,7 +66,12 @@ fn compare_block_objects_to_piece_objects<'a>( #[test] fn archiver() { let kzg = Kzg::new(embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg.clone()).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let mut archiver = Archiver::new(kzg.clone(), erasure_coding.clone()); let (block_0, block_0_object_mapping) = { let mut block = vec![0u8; RecordedHistorySegment::SIZE / 2]; @@ -220,6 +227,7 @@ fn archiver() { { let mut archiver_with_initial_state = Archiver::with_initial_state( kzg.clone(), + erasure_coding.clone(), first_archived_segment.segment_header, &block_1, block_1_object_mapping.clone(), @@ -346,6 +354,7 @@ fn archiver() { { let mut archiver_with_initial_state = Archiver::with_initial_state( kzg.clone(), + erasure_coding.clone(), last_segment_header, &block_2, BlockObjectMapping::default(), @@ -391,9 +400,15 @@ fn archiver() { #[test] fn invalid_usage() { let kzg = Kzg::new(embedded_kzg_settings()); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); { let result = Archiver::with_initial_state( kzg.clone(), + erasure_coding.clone(), SegmentHeader::V0 { segment_index: SegmentIndex::ZERO, segment_commitment: SegmentCommitment::default(), @@ -420,6 +435,7 @@ fn invalid_usage() { { let result = Archiver::with_initial_state( kzg, + erasure_coding.clone(), SegmentHeader::V0 { segment_index: SegmentIndex::ZERO, segment_commitment: SegmentCommitment::default(), @@ -456,6 +472,11 @@ fn invalid_usage() { #[test] fn one_byte_smaller_segment() { let kzg = Kzg::new(embedded_kzg_settings()); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); // Carefully compute the block size such that there is just 2 bytes left to fill the segment, // but this should already produce archived segment since just enum variant and smallest compact @@ -471,16 +492,14 @@ fn one_byte_smaller_segment() { // We leave two bytes at the end intentionally - 2; assert_eq!( - Archiver::new(kzg.clone()) - .unwrap() + Archiver::new(kzg.clone(), erasure_coding.clone()) .add_block(vec![0u8; block_size], BlockObjectMapping::default(), true) .len(), 1 ); // Cutting just one byte more is not sufficient to produce a segment, this is a protection // against code regressions - assert!(Archiver::new(kzg) - .unwrap() + assert!(Archiver::new(kzg, erasure_coding) .add_block( vec![0u8; block_size - 1], BlockObjectMapping::default(), @@ -492,7 +511,12 @@ fn one_byte_smaller_segment() { #[test] fn spill_over_edge_case() { let kzg = Kzg::new(embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let mut archiver = Archiver::new(kzg, erasure_coding); // Carefully compute the block size such that there is just 2 bytes left to fill the segment, // but this should already produce archived segment since just enum variant and smallest compact @@ -549,7 +573,12 @@ fn spill_over_edge_case() { #[test] fn object_on_the_edge_of_segment() { let kzg = Kzg::new(embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let mut archiver = Archiver::new(kzg, erasure_coding); let first_block = vec![0u8; RecordedHistorySegment::SIZE]; let archived_segments = archiver.add_block(first_block.clone(), BlockObjectMapping::default(), true); diff --git a/crates/subspace-archiving/tests/integration/piece_reconstruction.rs b/crates/subspace-archiving/tests/integration/piece_reconstruction.rs index b128e8450da..8a1abe8a638 100644 --- a/crates/subspace-archiving/tests/integration/piece_reconstruction.rs +++ b/crates/subspace-archiving/tests/integration/piece_reconstruction.rs @@ -1,11 +1,15 @@ use rand::Rng; #[cfg(feature = "parallel")] use rayon::prelude::*; +use std::num::NonZeroUsize; use subspace_archiving::archiver::Archiver; use subspace_archiving::piece_reconstructor::{PiecesReconstructor, ReconstructorError}; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::objects::BlockObjectMapping; -use subspace_core_primitives::{ArchivedHistorySegment, FlatPieces, Piece, RecordedHistorySegment}; +use subspace_core_primitives::{ + ArchivedHistorySegment, FlatPieces, Piece, Record, RecordedHistorySegment, +}; +use subspace_erasure_coding::ErasureCoding; fn pieces_to_option_of_pieces(pieces: &FlatPieces) -> Vec> { pieces.pieces().map(Some).collect() @@ -21,7 +25,12 @@ fn get_random_block() -> Vec { #[test] fn segment_reconstruction_works() { let kzg = Kzg::new(embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg.clone()).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let mut archiver = Archiver::new(kzg.clone(), erasure_coding.clone()); let block = get_random_block(); @@ -42,7 +51,7 @@ fn segment_reconstruction_works() { piece.take(); }); - let reconstructor = PiecesReconstructor::new(kzg).unwrap(); + let reconstructor = PiecesReconstructor::new(kzg, erasure_coding); let flat_pieces = reconstructor.reconstruct_segment(&maybe_pieces).unwrap(); @@ -62,7 +71,12 @@ fn segment_reconstruction_works() { #[test] fn piece_reconstruction_works() { let kzg = Kzg::new(embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg.clone()).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let mut archiver = Archiver::new(kzg.clone(), erasure_coding.clone()); // Block that fits into the segment fully let block = get_random_block(); @@ -83,7 +97,7 @@ fn piece_reconstruction_works() { .map(|(piece_position, piece)| (piece_position, piece.take().unwrap())) .collect::>(); - let reconstructor = PiecesReconstructor::new(kzg).unwrap(); + let reconstructor = PiecesReconstructor::new(kzg, erasure_coding); #[cfg(not(feature = "parallel"))] let iter = missing_pieces.iter(); @@ -107,8 +121,12 @@ fn piece_reconstruction_works() { #[test] fn segment_reconstruction_fails() { let kzg = Kzg::new(embedded_kzg_settings()); - - let reconstructor = PiecesReconstructor::new(kzg.clone()).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let reconstructor = PiecesReconstructor::new(kzg.clone(), erasure_coding.clone()); let pieces = vec![None]; let result = reconstructor.reconstruct_segment(&pieces); @@ -122,7 +140,7 @@ fn segment_reconstruction_fails() { )); } - let mut archiver = Archiver::new(kzg).unwrap(); + let mut archiver = Archiver::new(kzg, erasure_coding); // Block that fits into the segment fully let block = get_random_block(); @@ -144,8 +162,12 @@ fn segment_reconstruction_fails() { #[test] fn piece_reconstruction_fails() { let kzg = Kzg::new(embedded_kzg_settings()); - - let reconstructor = PiecesReconstructor::new(kzg.clone()).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let reconstructor = PiecesReconstructor::new(kzg.clone(), erasure_coding.clone()); let pieces = vec![None]; let result = reconstructor.reconstruct_piece(&pieces, 0); @@ -159,7 +181,7 @@ fn piece_reconstruction_fails() { )); } - let mut archiver = Archiver::new(kzg).unwrap(); + let mut archiver = Archiver::new(kzg, erasure_coding); // Block that fits into the segment fully let block = get_random_block(); diff --git a/crates/subspace-archiving/tests/integration/reconstructor.rs b/crates/subspace-archiving/tests/integration/reconstructor.rs index e9f612aae67..701cba89d39 100644 --- a/crates/subspace-archiving/tests/integration/reconstructor.rs +++ b/crates/subspace-archiving/tests/integration/reconstructor.rs @@ -1,14 +1,16 @@ use rand::{thread_rng, Rng}; use std::assert_matches::assert_matches; use std::iter; +use std::num::NonZeroUsize; use subspace_archiving::archiver::Archiver; use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError}; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::objects::BlockObjectMapping; use subspace_core_primitives::{ - ArchivedBlockProgress, ArchivedHistorySegment, FlatPieces, LastArchivedBlock, Piece, + ArchivedBlockProgress, ArchivedHistorySegment, FlatPieces, LastArchivedBlock, Piece, Record, RecordedHistorySegment, SegmentIndex, }; +use subspace_erasure_coding::ErasureCoding; fn pieces_to_option_of_pieces(pieces: &FlatPieces) -> Vec> { pieces.pieces().map(Some).collect() @@ -17,7 +19,12 @@ fn pieces_to_option_of_pieces(pieces: &FlatPieces) -> Vec> { #[test] fn basic() { let kzg = Kzg::new(embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let mut archiver = Archiver::new(kzg, erasure_coding.clone()); // Block that fits into the segment fully let block_0 = { let mut block = vec![0u8; RecordedHistorySegment::SIZE / 2]; @@ -59,7 +66,7 @@ fn basic() { assert_eq!(archived_segments.len(), 5); - let mut reconstructor = Reconstructor::new().unwrap(); + let mut reconstructor = Reconstructor::new(erasure_coding.clone()); { let contents = reconstructor @@ -91,7 +98,7 @@ fn basic() { } ); - let mut partial_reconstructor = Reconstructor::new().unwrap(); + let mut partial_reconstructor = Reconstructor::new(erasure_coding.clone()); let contents = partial_reconstructor .add_segment(&pieces_to_option_of_pieces(&archived_segments[1].pieces)) .unwrap(); @@ -132,7 +139,7 @@ fn basic() { } ); - let mut partial_reconstructor = Reconstructor::new().unwrap(); + let mut partial_reconstructor = Reconstructor::new(erasure_coding.clone()); let contents = partial_reconstructor .add_segment(&pieces_to_option_of_pieces(&archived_segments[2].pieces)) .unwrap(); @@ -175,7 +182,7 @@ fn basic() { } { - let mut partial_reconstructor = Reconstructor::new().unwrap(); + let mut partial_reconstructor = Reconstructor::new(erasure_coding.clone()); let contents = partial_reconstructor .add_segment(&pieces_to_option_of_pieces(&archived_segments[3].pieces)) .unwrap(); @@ -218,7 +225,7 @@ fn basic() { } { - let mut partial_reconstructor = Reconstructor::new().unwrap(); + let mut partial_reconstructor = Reconstructor::new(erasure_coding); let contents = partial_reconstructor .add_segment(&pieces_to_option_of_pieces(&archived_segments[4].pieces)) .unwrap(); @@ -243,7 +250,12 @@ fn basic() { #[test] fn partial_data() { let kzg = Kzg::new(embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let mut archiver = Archiver::new(kzg, erasure_coding.clone()); // Block that fits into the segment fully let block_0 = { let mut block = vec![0u8; RecordedHistorySegment::SIZE / 2]; @@ -268,8 +280,7 @@ fn partial_data() { { // Take just source shards - let contents = Reconstructor::new() - .unwrap() + let contents = Reconstructor::new(erasure_coding.clone()) .add_segment( &pieces .source_pieces() @@ -285,8 +296,7 @@ fn partial_data() { { // Take just parity shards - let contents = Reconstructor::new() - .unwrap() + let contents = Reconstructor::new(erasure_coding.clone()) .add_segment( &iter::repeat(None) .take(RecordedHistorySegment::NUM_RAW_RECORDS) @@ -312,7 +322,9 @@ fn partial_data() { .for_each(|piece| { piece.take(); }); - let contents = Reconstructor::new().unwrap().add_segment(&pieces).unwrap(); + let contents = Reconstructor::new(erasure_coding) + .add_segment(&pieces) + .unwrap(); assert_eq!(contents.blocks, vec![(0, block_0)]); } @@ -321,8 +333,12 @@ fn partial_data() { #[test] fn invalid_usage() { let kzg = Kzg::new(embedded_kzg_settings()); - - let mut archiver = Archiver::new(kzg).unwrap(); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .unwrap(); + let mut archiver = Archiver::new(kzg, erasure_coding.clone()); // Block that overflows into the next segments let block_0 = { let mut block = vec![0u8; RecordedHistorySegment::SIZE * 4]; @@ -336,7 +352,7 @@ fn invalid_usage() { { // Not enough shards with contents - let result = Reconstructor::new().unwrap().add_segment( + let result = Reconstructor::new(erasure_coding.clone()).add_segment( &archived_segments[0] .pieces .pieces() @@ -352,7 +368,7 @@ fn invalid_usage() { { // Garbage data - let result = Reconstructor::new().unwrap().add_segment( + let result = Reconstructor::new(erasure_coding.clone()).add_segment( &iter::repeat_with(|| { let mut piece = Piece::default(); thread_rng().fill(piece.as_mut()); @@ -366,7 +382,7 @@ fn invalid_usage() { } { - let mut reconstructor = Reconstructor::new().unwrap(); + let mut reconstructor = Reconstructor::new(erasure_coding); reconstructor .add_segment(&pieces_to_option_of_pieces(&archived_segments[0].pieces)) diff --git a/crates/subspace-core-primitives/src/crypto.rs b/crates/subspace-core-primitives/src/crypto.rs index c9e445b2b91..a4ca67164c0 100644 --- a/crates/subspace-core-primitives/src/crypto.rs +++ b/crates/subspace-core-primitives/src/crypto.rs @@ -63,9 +63,7 @@ pub fn blake3_hash_list(data: &[&[u8]]) -> Blake3Hash { *state.finalize().as_bytes() } -/// BLAKE3 hashing of a single value truncated to 254 bits. -/// -/// TODO: We probably wouldn't need this eventually +/// BLAKE3 hashing of a single value truncated to 254 bits as Scalar for usage with KZG. pub fn blake3_254_hash_to_scalar(data: &[u8]) -> Scalar { let mut hash = blake3_hash(data); // Erase last 2 bits to effectively truncate the hash (number is interpreted as little-endian) diff --git a/crates/subspace-core-primitives/src/crypto/kzg.rs b/crates/subspace-core-primitives/src/crypto/kzg.rs index ade459970e6..246798ad6d8 100644 --- a/crates/subspace-core-primitives/src/crypto/kzg.rs +++ b/crates/subspace-core-primitives/src/crypto/kzg.rs @@ -445,7 +445,7 @@ impl Kzg { /// Get FFT settings for specified number of values, uses internal cache to avoid derivation /// every time. - pub fn get_fft_settings(&self, num_values: usize) -> Result, String> { + fn get_fft_settings(&self, num_values: usize) -> Result, String> { let num_values = num_values.next_power_of_two(); Ok( match self.inner.fft_settings_cache.lock().entry(num_values) { diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index 095c3b79dc3..d5a8e0d45ce 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -47,12 +47,12 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut input = RecordedHistorySegment::new_boxed(); StdRng::seed_from_u64(42).fill(AsMut::<[u8]>::as_mut(input.as_mut())); let kzg = Kzg::new(kzg::embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg.clone()).unwrap(); let erasure_coding = ErasureCoding::new( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), ) .unwrap(); + let mut archiver = Archiver::new(kzg.clone(), erasure_coding.clone()); let mut table_generator = PosTable::generator(); let archived_history_segment = archiver .add_block( diff --git a/crates/subspace-farmer-components/benches/plotting.rs b/crates/subspace-farmer-components/benches/plotting.rs index fd602b0ad87..0c7aa6e2e6e 100644 --- a/crates/subspace-farmer-components/benches/plotting.rs +++ b/crates/subspace-farmer-components/benches/plotting.rs @@ -29,12 +29,12 @@ fn criterion_benchmark(c: &mut Criterion) { let mut input = RecordedHistorySegment::new_boxed(); StdRng::seed_from_u64(42).fill(AsMut::<[u8]>::as_mut(input.as_mut())); let kzg = Kzg::new(kzg::embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg.clone()).unwrap(); let erasure_coding = ErasureCoding::new( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), ) .unwrap(); + let mut archiver = Archiver::new(kzg.clone(), erasure_coding.clone()); let mut table_generators = [ PosTable::generator(), PosTable::generator(), diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index b3edcd8f20d..ce351e683e0 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -55,12 +55,12 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut rng = StdRng::seed_from_u64(42); rng.fill(AsMut::<[u8]>::as_mut(input.as_mut())); let kzg = &Kzg::new(kzg::embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg.clone()).unwrap(); let erasure_coding = &ErasureCoding::new( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), ) .unwrap(); + let mut archiver = Archiver::new(kzg.clone(), erasure_coding.clone()); let mut table_generator = PosTable::generator(); let archived_history_segment = archiver .add_block( diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index c7199e7a3d6..96f5536f7a1 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -48,12 +48,12 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut input = RecordedHistorySegment::new_boxed(); StdRng::seed_from_u64(42).fill(AsMut::<[u8]>::as_mut(input.as_mut())); let kzg = Kzg::new(kzg::embedded_kzg_settings()); - let mut archiver = Archiver::new(kzg.clone()).unwrap(); let erasure_coding = ErasureCoding::new( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), ) .unwrap(); + let mut archiver = Archiver::new(kzg.clone(), erasure_coding.clone()); let mut table_generator = PosTable::generator(); let archived_history_segment = archiver .add_block( diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index d5c2469e125..cd56a1a1c80 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -188,6 +188,7 @@ where piece_getter, farmer_protocol_info, kzg, + erasure_coding, pieces_in_sector, }); @@ -232,6 +233,8 @@ pub struct DownloadSectorOptions<'a, PG> { pub farmer_protocol_info: FarmerProtocolInfo, /// KZG instance pub kzg: &'a Kzg, + /// Erasure coding instance + pub erasure_coding: &'a ErasureCoding, /// How many pieces should sector contain pub pieces_in_sector: u16, } @@ -252,6 +255,7 @@ where piece_getter, farmer_protocol_info, kzg, + erasure_coding, pieces_in_sector, } = options; @@ -285,6 +289,7 @@ where &mut raw_sector, piece_getter, kzg, + erasure_coding, &mut incremental_piece_indices, ) .await @@ -628,6 +633,7 @@ async fn download_sector_internal( raw_sector: &mut RawSector, piece_getter: &PG, kzg: &Kzg, + erasure_coding: &ErasureCoding, piece_indexes: &mut [Option], ) -> Result<(), PlottingError> { // TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring @@ -659,8 +665,13 @@ async fn download_sector_internal( return Err(PlottingError::FailedToRetrievePiece { piece_index, error }); } }; - let recovered_piece = - recover_missing_piece(piece_getter, kzg.clone(), piece_index).await; + let recovered_piece = recover_missing_piece( + piece_getter, + kzg.clone(), + erasure_coding.clone(), + piece_index, + ) + .await; piece_result = recovered_piece.map(Some).map_err(Into::into); } diff --git a/crates/subspace-farmer-components/src/segment_reconstruction.rs b/crates/subspace-farmer-components/src/segment_reconstruction.rs index 74396db9f7d..78db118d11b 100644 --- a/crates/subspace-farmer-components/src/segment_reconstruction.rs +++ b/crates/subspace-farmer-components/src/segment_reconstruction.rs @@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use subspace_archiving::piece_reconstructor::{PiecesReconstructor, ReconstructorError}; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{ArchivedHistorySegment, Piece, PieceIndex, RecordedHistorySegment}; +use subspace_erasure_coding::ErasureCoding; use thiserror::Error; use tokio::sync::Semaphore; use tokio::task::JoinError; @@ -28,6 +29,7 @@ pub(crate) enum SegmentReconstructionError { pub(crate) async fn recover_missing_piece( piece_getter: &PG, kzg: Kzg, + erasure_coding: ErasureCoding, missing_piece_index: PieceIndex, ) -> Result { info!(%missing_piece_index, "Recovering missing piece..."); @@ -103,8 +105,7 @@ pub(crate) async fn recover_missing_piece( } let result = tokio::task::spawn_blocking(move || { - let reconstructor = - PiecesReconstructor::new(kzg).expect("Internal constructor call must succeed."); + let reconstructor = PiecesReconstructor::new(kzg, erasure_coding); reconstructor.reconstruct_piece(&segment_pieces, position as usize) }) diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index 37492943a0c..f289882d543 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -311,6 +311,7 @@ where piece_getter: &piece_getter, farmer_protocol_info, kzg: &kzg, + erasure_coding: &erasure_coding, pieces_in_sector, }); diff --git a/crates/subspace-service/Cargo.toml b/crates/subspace-service/Cargo.toml index 9e367208825..d5cd71e0d83 100644 --- a/crates/subspace-service/Cargo.toml +++ b/crates/subspace-service/Cargo.toml @@ -32,6 +32,7 @@ parity-scale-codec = "3.6.12" parking_lot = "0.12.2" prometheus-client = "0.22.3" prost = "0.12" +rayon = "1.10.0" sc-basic-authorship = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } sc-chain-spec = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" } @@ -84,6 +85,7 @@ sp-transaction-pool = { git = "https://github.com/subspace/polkadot-sdk", rev = static_assertions = "1.1.0" subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" } subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } +subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space" } subspace-runtime-primitives = { version = "0.1.0", path = "../subspace-runtime-primitives" } diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 18affa51d61..b56a2a03c40 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -113,10 +113,12 @@ use sp_subspace_mmr::host_functions::{SubspaceMmrExtension, SubspaceMmrHostFunct use sp_transaction_pool::runtime_api::TaggedTransactionQueue; use static_assertions::const_assert; use std::marker::PhantomData; +use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; -use subspace_core_primitives::{BlockNumber, PotSeed, REWARD_SIGNING_CONTEXT}; +use subspace_core_primitives::{BlockNumber, PotSeed, Record, REWARD_SIGNING_CONTEXT}; +use subspace_erasure_coding::ErasureCoding; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; @@ -519,7 +521,20 @@ where false, )?; - let kzg = tokio::task::block_in_place(|| Kzg::new(embedded_kzg_settings())); + // TODO: Make these explicit arguments we no longer use Substate's `Configuration` + let (kzg, maybe_erasure_coding) = tokio::task::block_in_place(|| { + rayon::join( + || Kzg::new(embedded_kzg_settings()), + || { + ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .map_err(|error| format!("Failed to instantiate erasure coding: {error}")) + }, + ) + }); + let erasure_coding = maybe_erasure_coding?; let client = Arc::new(client); let client_info = client.info(); @@ -559,7 +574,7 @@ where }) .map_err(|error| ServiceError::Application(error.into()))?; - let subspace_link = SubspaceLink::new(chain_constants, kzg.clone()); + let subspace_link = SubspaceLink::new(chain_constants, kzg.clone(), erasure_coding); let segment_headers_store = segment_headers_store.clone(); let block_import = SubspaceBlockImport::::new( @@ -988,6 +1003,7 @@ where dsn_sync_piece_getter.clone(), Arc::clone(&network_service), sync_service.clone(), + subspace_link.erasure_coding().clone(), ); let (observer, worker) = sync_from_dsn::create_observer_and_worker( @@ -1000,6 +1016,7 @@ where sync_target_block_number, pause_sync, dsn_sync_piece_getter, + subspace_link.erasure_coding().clone(), ); task_manager .spawn_handle() @@ -1212,6 +1229,7 @@ where segment_headers_store: segment_headers_store.clone(), sync_oracle: sync_oracle.clone(), kzg: subspace_link.kzg().clone(), + erasure_coding: subspace_link.erasure_coding().clone(), backend: backend.clone(), }; diff --git a/crates/subspace-service/src/rpc.rs b/crates/subspace-service/src/rpc.rs index 77763e44c62..63931822663 100644 --- a/crates/subspace-service/src/rpc.rs +++ b/crates/subspace-service/src/rpc.rs @@ -44,6 +44,7 @@ use sp_objects::ObjectsApi; use std::sync::Arc; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::BlockNumber; +use subspace_erasure_coding::ErasureCoding; use subspace_networking::libp2p::Multiaddr; use subspace_runtime_primitives::opaque::Block; use subspace_runtime_primitives::{AccountId, Balance, Nonce}; @@ -80,6 +81,8 @@ where pub sync_oracle: SubspaceSyncOracle, /// Kzg instance. pub kzg: Kzg, + /// Erasure coding instance. + pub erasure_coding: ErasureCoding, /// Backend used by the node. pub backend: Arc, } @@ -122,6 +125,7 @@ where segment_headers_store, sync_oracle, kzg, + erasure_coding, backend, } = deps; @@ -144,6 +148,7 @@ where segment_headers_store, sync_oracle, kzg, + erasure_coding, deny_unsafe, })? .into_rpc(), diff --git a/crates/subspace-service/src/sync_from_dsn.rs b/crates/subspace-service/src/sync_from_dsn.rs index de6f1029d23..5db38376249 100644 --- a/crates/subspace-service/src/sync_from_dsn.rs +++ b/crates/subspace-service/src/sync_from_dsn.rs @@ -26,6 +26,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use subspace_core_primitives::{Piece, PieceIndex, SegmentIndex}; +use subspace_erasure_coding::ErasureCoding; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator}; use subspace_networking::Node; use tracing::{debug, info, warn}; @@ -96,6 +97,7 @@ pub(super) fn create_observer_and_worker( sync_target_block_number: Arc, pause_sync: Arc, piece_getter: PG, + erasure_coding: ErasureCoding, ) -> ( impl Future + Send + 'static, impl Future> + Send + 'static, @@ -134,6 +136,7 @@ where pause_sync, rx, &piece_getter, + &erasure_coding, ) .await }; @@ -264,6 +267,7 @@ async fn create_worker( pause_sync: Arc, mut notifications: mpsc::Receiver, piece_getter: &PG, + erasure_coding: &ErasureCoding, ) -> Result<(), sc_service::Error> where Block: BlockT, @@ -308,6 +312,7 @@ where import_queue_service, &mut last_processed_segment_index, &mut last_processed_block_number, + erasure_coding, ); let wait_almost_synced_fut = async { loop { diff --git a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs index a24e124dc79..6970285b078 100644 --- a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs +++ b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs @@ -33,6 +33,7 @@ use subspace_archiving::reconstructor::Reconstructor; use subspace_core_primitives::{ ArchivedHistorySegment, BlockNumber, Piece, RecordedHistorySegment, SegmentIndex, }; +use subspace_erasure_coding::ErasureCoding; use subspace_networking::utils::multihash::ToMultihash; use tokio::sync::Semaphore; use tracing::warn; @@ -46,6 +47,7 @@ const WAIT_FOR_BLOCKS_TO_IMPORT: Duration = Duration::from_secs(1); /// Starts the process of importing blocks. /// /// Returns number of downloaded blocks. +#[allow(clippy::too_many_arguments)] pub(super) async fn import_blocks_from_dsn( segment_headers_store: &SegmentHeadersStore, segment_header_downloader: &SegmentHeaderDownloader<'_>, @@ -54,6 +56,7 @@ pub(super) async fn import_blocks_from_dsn( import_queue_service: &mut IQS, last_processed_segment_index: &mut SegmentIndex, last_processed_block_number: &mut ::Number, + erasure_coding: &ErasureCoding, ) -> Result where Block: BlockT, @@ -84,7 +87,7 @@ where } let mut imported_blocks = 0; - let mut reconstructor = Reconstructor::new().map_err(|error| error.to_string())?; + let mut reconstructor = Reconstructor::new(erasure_coding.clone()); // Start from the first unprocessed segment and process all segments known so far let segment_indices_iter = (*last_processed_segment_index + SegmentIndex::ONE) ..=segment_headers_store @@ -120,7 +123,7 @@ where if last_archived_block_number <= *last_processed_block_number { *last_processed_segment_index = segment_index; // Reset reconstructor instance - reconstructor = Reconstructor::new().map_err(|error| error.to_string())?; + reconstructor = Reconstructor::new(erasure_coding.clone()); continue; } // Just one partial unprocessed block and this was the last segment available, so nothing to @@ -130,7 +133,7 @@ where && segment_indices_iter.peek().is_none() { // Reset reconstructor instance - reconstructor = Reconstructor::new().map_err(|error| error.to_string())?; + reconstructor = Reconstructor::new(erasure_coding.clone()); continue; } diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs index 1cfa691f73c..019b2dc38c7 100644 --- a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use std::time::Duration; use subspace_archiving::reconstructor::Reconstructor; use subspace_core_primitives::{BlockNumber, SegmentIndex}; +use subspace_erasure_coding::ErasureCoding; use subspace_networking::Node; use tokio::time::sleep; use tracing::{debug, error}; @@ -39,6 +40,7 @@ pub(crate) async fn snap_sync( piece_getter: PG, network_request: NR, sync_service: Arc>, + erasure_coding: ErasureCoding, ) where Backend: sc_client_api::Backend, Block: BlockT, @@ -74,6 +76,7 @@ pub(crate) async fn snap_sync( &network_request, &sync_service, None, + &erasure_coding, ); match snap_sync_fut.await { @@ -104,6 +107,7 @@ pub(crate) async fn get_blocks_from_target_segment( node: &Node, piece_getter: &PG, target_block: Option, + erasure_coding: &ErasureCoding, ) -> Result)>)>, Error> where AS: AuxStore, @@ -225,7 +229,7 @@ where // Reconstruct blocks of the last segment let mut blocks = VecDeque::new(); { - let mut reconstructor = Reconstructor::new().map_err(|error| error.to_string())?; + let mut reconstructor = Reconstructor::new(erasure_coding.clone()); for segment_index in segments_to_reconstruct { let blocks_fut = @@ -251,6 +255,7 @@ async fn sync( network_request: &NR, sync_service: &SyncingService, target_block: Option, + erasure_coding: &ErasureCoding, ) -> Result<(), Error> where B: sc_client_api::Backend, @@ -273,9 +278,14 @@ where { debug!("Starting snap sync..."); - let Some((target_segment_index, mut blocks)) = - get_blocks_from_target_segment(segment_headers_store, node, piece_getter, target_block) - .await? + let Some((target_segment_index, mut blocks)) = get_blocks_from_target_segment( + segment_headers_store, + node, + piece_getter, + target_block, + erasure_coding, + ) + .await? else { // Snap-sync skipped return Ok(()); diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index 2b6d13f903c..7b4ba3484bd 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -210,8 +210,8 @@ where Client: BlockBackend + HeaderBackend, { let kzg = Kzg::new(embedded_kzg_settings()); - let mut archiver = subspace_archiving::archiver::Archiver::new(kzg.clone()) - .expect("Incorrect parameters for archiver"); + let mut archiver = + subspace_archiving::archiver::Archiver::new(kzg.clone(), erasure_coding.clone()); let genesis_block = client.block(client.info().genesis_hash).unwrap().unwrap(); let archived_segment = archiver