From d3106bff6c9aa0fa74a67262240551723dec33bf Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 2 Jan 2025 11:47:49 +1000 Subject: [PATCH 1/6] Move ObjectMappingResponse to primitives --- crates/sc-consensus-subspace-rpc/src/lib.rs | 17 ++--------------- crates/subspace-core-primitives/src/objects.rs | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index 6029ec2c41..0dff89e28e 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -55,11 +55,11 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use subspace_archiving::archiver::NewArchivedSegment; use subspace_core_primitives::hashes::Blake3Hash; -use subspace_core_primitives::objects::GlobalObjectMapping; +use subspace_core_primitives::objects::{GlobalObjectMapping, ObjectMappingResponse}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use subspace_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex}; use subspace_core_primitives::solutions::Solution; -use subspace_core_primitives::{BlockHash, BlockNumber, PublicKey, SlotNumber}; +use subspace_core_primitives::{BlockHash, PublicKey, SlotNumber}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::FarmerProtocolInfo; use subspace_kzg::Kzg; @@ -223,19 +223,6 @@ impl CachedArchivedSegment { } } -/// Response to object mapping subscription, including a block height. -/// Large responses are batched, so the block height can be repeated in different responses. -#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ObjectMappingResponse { - /// The block number that the object mapping is from. - pub block_number: BlockNumber, - - /// The object mappings. - #[serde(flatten)] - pub objects: GlobalObjectMapping, -} - /// Subspace RPC configuration pub struct SubspaceRpcConfig where diff --git a/crates/subspace-core-primitives/src/objects.rs b/crates/subspace-core-primitives/src/objects.rs index 838b66f1b0..467e5a7f80 100644 --- a/crates/subspace-core-primitives/src/objects.rs +++ b/crates/subspace-core-primitives/src/objects.rs @@ -24,6 +24,7 @@ extern crate alloc; use crate::hashes::Blake3Hash; use crate::pieces::PieceIndex; +use crate::BlockNumber; #[cfg(not(feature = "std"))] use alloc::vec::Vec; use core::default::Default; @@ -172,3 +173,17 @@ impl GlobalObjectMapping { } } } + +/// Response to object mapping subscription, including a block height. +/// Large responses are batched, so the block height can be repeated in different responses. +#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))] +pub struct ObjectMappingResponse { + /// The block number that the object mapping is from. + pub block_number: BlockNumber, + + /// The object mappings. + #[cfg_attr(feature = "serde", serde(flatten))] + pub objects: GlobalObjectMapping, +} From 195e74749ace3c8de790e15b138cd18fe56e05d4 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 2 Jan 2025 12:02:24 +1000 Subject: [PATCH 2/6] Use ObjectMappingResponse as the indexer format --- .../src/commands/http/server.rs | 44 +++++-------------- 1 file changed, 12 insertions(+), 32 deletions(-) diff --git a/crates/subspace-gateway/src/commands/http/server.rs b/crates/subspace-gateway/src/commands/http/server.rs index d222509e63..c8c9dde1ed 100644 --- a/crates/subspace-gateway/src/commands/http/server.rs +++ b/crates/subspace-gateway/src/commands/http/server.rs @@ -1,12 +1,9 @@ //! HTTP server which fetches objects from the DSN based on a hash, using a mapping indexer service. use actix_web::{web, App, HttpResponse, HttpServer, Responder}; -use serde::{Deserialize, Deserializer, Serialize}; -use std::default::Default; use std::sync::Arc; use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; -use subspace_core_primitives::pieces::PieceIndex; -use subspace_core_primitives::BlockNumber; +use subspace_core_primitives::objects::ObjectMappingResponse; use subspace_data_retrieval::object_fetcher::ObjectFetcher; use subspace_data_retrieval::piece_getter::PieceGetter; use tracing::{debug, error, trace}; @@ -21,39 +18,18 @@ where pub(crate) http_endpoint: String, } -/// Object mapping format from the indexer service. -#[derive(Serialize, Deserialize, Debug, Default)] -#[serde(rename_all = "camelCase")] -struct ObjectMapping { +/// Requests the object mapping with `hash` from the indexer service. +async fn request_object_mapping( + endpoint: &str, hash: Blake3Hash, - piece_index: PieceIndex, - piece_offset: u32, - #[serde(deserialize_with = "string_to_u32")] - block_number: BlockNumber, -} - -/// Utility function to deserialize a JSON string into a u32. -fn string_to_u32<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let s: String = Deserialize::deserialize(deserializer)?; - s.parse::().map_err(serde::de::Error::custom) -} - -/// Requests an object mapping with `hash` from the indexer service. -async fn request_object_mapping(endpoint: &str, hash: Blake3Hash) -> anyhow::Result { +) -> anyhow::Result { let client = reqwest::Client::new(); let object_mappings_url = format!("{}/objects/{}", endpoint, hex::encode(hash)); debug!(?hash, ?object_mappings_url, "Requesting object mapping..."); - let response = client - .get(&object_mappings_url) - .send() - .await? - .json::() - .await; + let response = client.get(&object_mappings_url).send().await?.json().await; + match &response { Ok(json) => { trace!(?hash, ?json, "Received object mapping"); @@ -82,6 +58,10 @@ where return HttpResponse::BadRequest().finish(); }; + let Some(object_mapping) = object_mapping.objects.objects().first() else { + return HttpResponse::BadRequest().finish(); + }; + if object_mapping.hash != hash { error!( ?object_mapping, @@ -93,7 +73,7 @@ where let object_fetcher_result = server_params .object_fetcher - .fetch_object(object_mapping.piece_index, object_mapping.piece_offset) + .fetch_object(object_mapping.piece_index, object_mapping.offset) .await; let object = match object_fetcher_result { From 44480e00e4fb0fae9bc3c643206adec6c6c71f38 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 2 Jan 2025 13:06:00 +1000 Subject: [PATCH 3/6] Move hash checks to ObjectFetcher and cleanup errors --- Cargo.lock | 1 + crates/subspace-gateway-rpc/src/lib.rs | 34 +- .../src/commands/http/server.rs | 20 +- shared/subspace-data-retrieval/Cargo.toml | 1 + .../src/object_fetcher.rs | 367 +++++++++--------- .../src/piece_fetcher.rs | 2 +- 6 files changed, 195 insertions(+), 230 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b48bc57df2..2ef972585e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12592,6 +12592,7 @@ dependencies = [ "anyhow", "async-trait", "futures", + "hex", "parity-scale-codec", "subspace-archiving", "subspace-core-primitives", diff --git a/crates/subspace-gateway-rpc/src/lib.rs b/crates/subspace-gateway-rpc/src/lib.rs index f50b3e521c..c907ed844c 100644 --- a/crates/subspace-gateway-rpc/src/lib.rs +++ b/crates/subspace-gateway-rpc/src/lib.rs @@ -5,11 +5,10 @@ use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; use std::fmt; use std::ops::{Deref, DerefMut}; -use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; use subspace_core_primitives::objects::GlobalObjectMapping; use subspace_data_retrieval::object_fetcher::{self, ObjectFetcher}; use subspace_data_retrieval::piece_getter::PieceGetter; -use tracing::{debug, error, trace}; +use tracing::{debug, error}; const SUBSPACE_ERROR: i32 = 9000; @@ -33,17 +32,6 @@ pub enum Error { /// The object fetcher failed. #[error(transparent)] ObjectFetcherError(#[from] object_fetcher::Error), - - /// The returned object data did not match the hash in the mapping. - #[error( - "Invalid object hash, mapping had {mapping_hash:?}, but fetched data had {data_hash:?}" - )] - InvalidObjectHash { - /// The expected hash from the mapping. - mapping_hash: Blake3Hash, - /// The actual hash of the returned object data. - data_hash: Blake3Hash, - }, } impl From for ErrorObjectOwned { @@ -147,25 +135,7 @@ where let mut objects = Vec::with_capacity(count); // TODO: fetch concurrently for mapping in mappings.objects() { - let data = self - .object_fetcher - .fetch_object(mapping.piece_index, mapping.offset) - .await?; - - let data_hash = blake3_hash(&data); - if data_hash != mapping.hash { - error!( - ?data_hash, - data_size = %data.len(), - ?mapping.hash, - "Retrieved data did not match mapping hash", - ); - trace!(data = %hex::encode(data), "Retrieved data"); - return Err(Error::InvalidObjectHash { - mapping_hash: mapping.hash, - data_hash, - }); - } + let data = self.object_fetcher.fetch_object(*mapping).await?; objects.push(data.into()); } diff --git a/crates/subspace-gateway/src/commands/http/server.rs b/crates/subspace-gateway/src/commands/http/server.rs index c8c9dde1ed..6efbd421e3 100644 --- a/crates/subspace-gateway/src/commands/http/server.rs +++ b/crates/subspace-gateway/src/commands/http/server.rs @@ -2,7 +2,7 @@ use actix_web::{web, App, HttpResponse, HttpServer, Responder}; use std::sync::Arc; -use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; +use subspace_core_primitives::hashes::Blake3Hash; use subspace_core_primitives::objects::ObjectMappingResponse; use subspace_data_retrieval::object_fetcher::ObjectFetcher; use subspace_data_retrieval::piece_getter::PieceGetter; @@ -58,7 +58,8 @@ where return HttpResponse::BadRequest().finish(); }; - let Some(object_mapping) = object_mapping.objects.objects().first() else { + // TODO: fetch multiple objects + let Some(&object_mapping) = object_mapping.objects.objects().first() else { return HttpResponse::BadRequest().finish(); }; @@ -73,25 +74,12 @@ where let object_fetcher_result = server_params .object_fetcher - .fetch_object(object_mapping.piece_index, object_mapping.offset) + .fetch_object(object_mapping) .await; let object = match object_fetcher_result { Ok(object) => { trace!(?hash, size = %object.len(), "Object fetched successfully"); - - let data_hash = blake3_hash(&object); - if data_hash != hash { - error!( - ?data_hash, - data_size = %object.len(), - ?hash, - "Retrieved data doesn't match requested mapping hash" - ); - trace!(data = %hex::encode(object), "Retrieved data"); - return HttpResponse::ServiceUnavailable().finish(); - } - object } Err(err) => { diff --git a/shared/subspace-data-retrieval/Cargo.toml b/shared/subspace-data-retrieval/Cargo.toml index 5f53ed11c2..1843628c25 100644 --- a/shared/subspace-data-retrieval/Cargo.toml +++ b/shared/subspace-data-retrieval/Cargo.toml @@ -15,6 +15,7 @@ include = [ anyhow = "1.0.89" async-trait = "0.1.83" futures = "0.3.31" +hex = "0.4.3" parity-scale-codec = { version = "3.6.12", features = ["derive"] } subspace-archiving = { version = "0.1.0", path = "../../crates/subspace-archiving" } subspace-core-primitives = { version = "0.1.0", path = "../../crates/subspace-core-primitives" } diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index b1c50444ff..179bc513ac 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -21,6 +21,8 @@ use crate::segment_downloading::{download_segment, SegmentDownloadingError}; use parity_scale_codec::{Compact, CompactLen, Decode, Encode}; use std::sync::Arc; use subspace_archiving::archiver::{Segment, SegmentItem}; +use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; +use subspace_core_primitives::objects::GlobalObject; use subspace_core_primitives::pieces::{Piece, PieceIndex, RawRecord}; use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentIndex}; use subspace_erasure_coding::ErasureCoding; @@ -43,100 +45,99 @@ pub const MAX_SUPPORTED_OBJECT_LENGTH: usize = 1024 * 1024 * 1024 - 1; #[derive(Debug, thiserror::Error)] pub enum Error { /// Supplied piece index is not a source piece - #[error("Piece index {piece_index} is not a source piece, offset: {piece_offset}")] - NotSourcePiece { - piece_index: PieceIndex, - piece_offset: u32, - }, + #[error("Piece index is not a source piece, object: {mapping:?}")] + NotSourcePiece { mapping: GlobalObject }, /// Supplied piece offset is too large - #[error("Piece offset {piece_offset} is too large, must be less than {}, piece index: {piece_index}", RawRecord::SIZE)] - PieceOffsetTooLarge { - piece_index: PieceIndex, - piece_offset: u32, - }, + #[error( + "Piece offset is too large, must be less than {}, object: {mapping:?}", + RawRecord::SIZE + )] + PieceOffsetTooLarge { mapping: GlobalObject }, /// No item in segment at offset - #[error("Offset {offset_in_segment} in segment {segment_index} is not an item, current progress: {progress}, object: {piece_index:?}, {piece_offset}")] + #[error("Offset {offset_in_segment} in segment {segment_index} is not an item, current progress: {progress}, object: {mapping:?}")] NoSegmentItem { progress: usize, offset_in_segment: usize, segment_index: SegmentIndex, - piece_index: PieceIndex, - piece_offset: u32, + mapping: GlobalObject, }, /// Unexpected item in first segment at offset - #[error("Offset {offset_in_segment} in first segment {segment_index} has unexpected item, current progress: {segment_progress}, object: {piece_index:?}, {piece_offset}, item: {segment_item:?}")] + #[error("Offset {offset_in_segment} in first segment {segment_index} has unexpected item, current progress: {segment_progress}, object: {mapping:?}, item: {segment_item:?}")] UnexpectedFirstSegmentItem { segment_progress: usize, offset_in_segment: usize, segment_index: SegmentIndex, segment_item: Box, - piece_index: PieceIndex, - piece_offset: u32, + mapping: GlobalObject, }, /// Unexpected item in continuing segment at offset - #[error("Continuing segment {segment_index} has unexpected item, collected data: {collected_data}, object: {piece_index:?}, {piece_offset}, item: {segment_item:?}")] + #[error("Continuing segment {segment_index} has unexpected item, collected data: {collected_data}, object: {mapping:?}, item: {segment_item:?}")] UnexpectedContinuingSegmentItem { collected_data: usize, segment_index: SegmentIndex, segment_item: Box, - piece_index: PieceIndex, - piece_offset: u32, + mapping: GlobalObject, }, /// Object not found after downloading expected number of segments - #[error("Object segment range {first_segment_index}..={last_segment_index} did not contain full object, object: {piece_index:?}, {piece_offset}")] + #[error("Object segment range {first_segment_index}..={last_segment_index} did not contain full object, object: {mapping:?}")] TooManySegments { first_segment_index: SegmentIndex, last_segment_index: SegmentIndex, - piece_index: PieceIndex, - piece_offset: u32, + mapping: GlobalObject, }, /// Object is too large error #[error( - "Data length {data_length} exceeds maximum object size {max_object_len} for object: {piece_index:?}, {piece_offset}" + "Data length {data_length} exceeds maximum object size {max_object_len} for object: {mapping:?}" )] ObjectTooLarge { data_length: usize, max_object_len: usize, - piece_index: PieceIndex, - piece_offset: u32, + mapping: GlobalObject, }, /// Length prefix is too large error #[error( - "Length prefix length {length_prefix_len} exceeds maximum object size {max_object_len} for object: {piece_index:?}, {piece_offset}" + "Length prefix length {length_prefix_len} exceeds maximum object size {max_object_len} for object: {mapping:?}" )] LengthPrefixTooLarge { length_prefix_len: usize, max_object_len: usize, - piece_index: PieceIndex, - piece_offset: u32, + mapping: GlobalObject, + }, + + /// Hash doesn't match data + #[error("Incorrect data hash {data_hash:?} for {data_size} byte object: {mapping:?}")] + InvalidDataHash { + data_hash: Blake3Hash, + data_size: usize, + mapping: GlobalObject, }, /// Object decoding error - #[error("Object data decoding error: {source:?}")] + #[error("Object data decoding error: {source:?}, object: {mapping:?}")] ObjectDecoding { - #[from] source: parity_scale_codec::Error, + mapping: GlobalObject, }, /// Segment getter error - #[error("Getting segment failed: {source:?}")] + #[error("Getting segment failed: {source:?}, object: {mapping:?}")] SegmentGetter { - #[from] source: SegmentDownloadingError, + mapping: GlobalObject, }, /// Piece getter error - #[error("Getting piece caused an error: {source:?}")] + #[error("Getting piece caused an error: {source:?}, object: {mapping:?}")] PieceGetterError { - #[from] source: anyhow::Error, + mapping: GlobalObject, }, /// Piece getter couldn't find the piece @@ -192,82 +193,91 @@ where } } - /// Assemble the object in `piece_index` at `piece_offset` by fetching necessary pieces using - /// the piece getter and putting the object's bytes together. + /// Assemble the object in `mapping` by fetching necessary pieces using the piece getter, and + /// putting the object's bytes together. /// - /// The caller should check the object's hash to make sure the correct bytes are returned. - pub async fn fetch_object( - &self, - piece_index: PieceIndex, - piece_offset: u32, - ) -> Result, Error> { + /// Checks the object's hash to make sure the correct bytes are returned. + pub async fn fetch_object(&self, mapping: GlobalObject) -> Result, Error> { + let GlobalObject { + hash, + piece_index, + offset, + } = mapping; + // Validate parameters if !piece_index.is_source() { tracing::debug!( - %piece_index, - piece_offset, + ?mapping, "Invalid piece index for object: must be a source piece", ); // Parity pieces contain effectively random data, and can't be used to fetch objects - return Err(Error::NotSourcePiece { - piece_index, - piece_offset, - }); + return Err(Error::NotSourcePiece { mapping }); } - if piece_offset >= RawRecord::SIZE as u32 { + if offset >= RawRecord::SIZE as u32 { tracing::debug!( - %piece_index, - piece_offset, + ?mapping, RawRecord_SIZE = RawRecord::SIZE, "Invalid piece offset for object: must be less than the size of a raw record", ); - return Err(Error::PieceOffsetTooLarge { - piece_index, - piece_offset, - }); + return Err(Error::PieceOffsetTooLarge { mapping }); } - // Try fast object assembling from individual pieces - if let Some(data) = self.fetch_object_fast(piece_index, piece_offset).await? { + // Try fast object assembling from individual pieces, + // then regular object assembling from segments + let data = match self.fetch_object_fast(mapping).await? { + Some(data) => data, + None => { + let data = self.fetch_object_regular(mapping).await?; + + debug!( + ?mapping, + len = %data.len(), + "Fetched object using regular object assembling", + + ); + + data + } + }; + + let data_hash = blake3_hash(&data); + if data_hash != hash { tracing::debug!( - %piece_index, - piece_offset, - len = %data.len(), - "Fetched object using fast object assembling", + ?data_hash, + data_size = %data.len(), + ?mapping, + "Retrieved data doesn't match requested mapping hash" ); + tracing::trace!(data = %hex::encode(&data), "Retrieved data"); - return Ok(data); + return Err(Error::InvalidDataHash { + data_hash, + data_size: data.len(), + mapping, + }); } - // Regular object assembling from segments - let data = self.fetch_object_regular(piece_index, piece_offset).await?; - - tracing::debug!( - %piece_index, - piece_offset, - len = %data.len(), - "Fetched object using regular object assembling", - ); - Ok(data) } /// Fast object fetching and assembling where the object doesn't cross piece (super fast) or /// segment (just fast) boundaries, returns `Ok(None)` if fast retrieval is not guaranteed. // TODO: return already downloaded pieces from fetch_object_fast() and pass them to fetch_object_regular() - async fn fetch_object_fast( - &self, - piece_index: PieceIndex, - piece_offset: u32, - ) -> Result>, Error> { + async fn fetch_object_fast(&self, mapping: GlobalObject) -> Result>, Error> { + let GlobalObject { + piece_index, + offset, + .. + } = mapping; + // If the offset is before the last few bytes of a segment, we might be able to do very // fast object retrieval without assembling and processing the whole segment. // // The last few bytes might contain padding if a piece is the last piece in the segment. - let before_max_padding = piece_offset as usize <= RawRecord::SIZE - 1 - MAX_SEGMENT_PADDING; + let before_max_padding = offset as usize <= RawRecord::SIZE - 1 - MAX_SEGMENT_PADDING; let piece_position_in_segment = piece_index.source_position(); let data_shards = RecordedHistorySegment::NUM_RAW_RECORDS as u32; let last_data_piece_in_segment = piece_position_in_segment >= data_shards - 1; @@ -275,8 +285,7 @@ where if last_data_piece_in_segment && !before_max_padding { trace!( piece_position_in_segment, - %piece_index, - piece_offset, + ?mapping, "Fast object retrieval not possible: last source piece in segment, \ and start of object length bytes is in potential segment padding", ); @@ -290,7 +299,7 @@ where // // The last few bytes might contain padding if a piece is the last piece in the segment. let bytes_available_in_segment = - (data_shards - piece_position_in_segment) * RawRecord::SIZE as u32 - piece_offset; + (data_shards - piece_position_in_segment) * RawRecord::SIZE as u32 - offset; let Some(bytes_available_in_segment) = bytes_available_in_segment.checked_sub(MAX_SEGMENT_PADDING as u32) else { @@ -302,9 +311,7 @@ where let mut read_records_data = Vec::::with_capacity(RawRecord::SIZE * 2); let mut next_source_piece_index = piece_index; - let piece = self - .read_piece(next_source_piece_index, piece_index, piece_offset) - .await?; + let piece = self.read_piece(next_source_piece_index, mapping).await?; next_source_piece_index = next_source_piece_index.next_source_index(); // Discard piece data before the offset read_records_data.extend( @@ -312,7 +319,7 @@ where .record() .to_raw_record_chunks() .flatten() - .skip(piece_offset as usize) + .skip(offset as usize) .copied(), ); @@ -322,7 +329,7 @@ where read_records_data.truncate(read_records_data.len() - MAX_SEGMENT_PADDING); } - let data_length = self.decode_data_length(&read_records_data, piece_index, piece_offset)?; + let data_length = self.decode_data_length(&read_records_data, mapping)?; let data_length = if let Some(data_length) = data_length { data_length @@ -333,25 +340,21 @@ where %next_source_piece_index, piece_position_in_segment, bytes_available_in_segment, - %piece_index, - piece_offset, + ?mapping, "Part of object length bytes is in next piece, fetching", ); - let piece = self - .read_piece(next_source_piece_index, piece_index, piece_offset) - .await?; + let piece = self.read_piece(next_source_piece_index, mapping).await?; next_source_piece_index = next_source_piece_index.next_source_index(); read_records_data.extend(piece.record().to_raw_record_chunks().flatten().copied()); - self.decode_data_length(&read_records_data, piece_index, piece_offset)? + self.decode_data_length(&read_records_data, mapping)? .expect("Extra RawRecord is larger than the length encoding; qed") } else { trace!( piece_position_in_segment, bytes_available_in_segment, - %piece_index, - piece_offset, + ?mapping, "Fast object retrieval not possible: last source piece in segment, \ and part of object length bytes is in potential segment padding", ); @@ -366,8 +369,7 @@ where data_length, bytes_available_in_segment, piece_position_in_segment, - %piece_index, - piece_offset, + ?mapping, "Fast object retrieval not possible: part of object data bytes is in \ potential segment padding", ); @@ -384,7 +386,7 @@ where .filter(|i| i.is_source()) .take(remaining_piece_count) .collect::>(); - self.read_pieces(remaining_piece_indexes) + self.read_pieces(&remaining_piece_indexes, mapping) .await? .into_iter() .for_each(|piece| { @@ -394,35 +396,46 @@ where } // Decode the data, and return it if it's valid - let read_records_data = Vec::::decode(&mut read_records_data.as_slice())?; + let read_records_data = Vec::::decode(&mut read_records_data.as_slice()) + .map_err(|source| Error::ObjectDecoding { source, mapping })?; + + debug!( + ?mapping, + len = %read_records_data.len(), + "Fetched object using fast object assembling", + ); Ok(Some(read_records_data)) } /// Fetch and assemble an object that can cross segment boundaries, which requires assembling /// and iterating over full segments. - async fn fetch_object_regular( - &self, - piece_index: PieceIndex, - piece_offset: u32, - ) -> Result, Error> { + async fn fetch_object_regular(&self, mapping: GlobalObject) -> Result, Error> { + let GlobalObject { + piece_index, + offset, + .. + } = mapping; + let mut segment_index = piece_index.segment_index(); let piece_position_in_segment = piece_index.source_position(); // Used to access the data after it is converted to raw bytes let offset_in_segment = - piece_position_in_segment as usize * RawRecord::SIZE + piece_offset as usize; + piece_position_in_segment as usize * RawRecord::SIZE + offset as usize; tracing::trace!( %segment_index, offset_in_segment, piece_position_in_segment, - %piece_index, - piece_offset, + ?mapping, "Fetching object from segment(s)", ); let mut data = { - let items = self.read_segment(segment_index).await?.into_items(); + let items = self + .read_segment(segment_index, mapping) + .await? + .into_items(); // Go through the segment until we reach the offset. // Unconditional progress is enum variant + compact encoding of number of elements let mut progress = 1 + Compact::compact_len(&(items.len() as u64)); @@ -441,8 +454,7 @@ where progress, offset_in_segment, ?segment_index, - %piece_index, - piece_offset, + ?mapping, "Failed to find item at offset in segment" ); @@ -450,8 +462,7 @@ where progress, offset_in_segment, segment_index, - piece_index, - piece_offset, + mapping, } })?; @@ -460,8 +471,7 @@ where %segment_index, offset_in_segment, piece_position_in_segment, - %piece_index, - piece_offset, + ?mapping, segment_item = format!("{segment_item:?}").chars().take(50).collect::(), "Found item at offset in first segment", ); @@ -484,8 +494,7 @@ where segment_progress = progress, offset_in_segment, %segment_index, - %piece_index, - piece_offset, + ?mapping, segment_item = format!("{segment_item:?}").chars().take(50).collect::(), "Unexpected segment item in first segment", ); @@ -494,8 +503,7 @@ where segment_progress: progress, offset_in_segment, segment_index, - piece_index, - piece_offset, + mapping, segment_item: Box::new(segment_item), }); } @@ -506,20 +514,18 @@ where %segment_index, offset_in_segment, piece_position_in_segment, - %piece_index, - piece_offset, + ?mapping, data_len = data.len(), "Got data at offset in first segment", ); // Return an error if the length is unreasonably large, before we get the next segment - if let Some(data_length) = - self.decode_data_length(data.as_slice(), piece_index, piece_offset)? - { + if let Some(data_length) = self.decode_data_length(data.as_slice(), mapping)? { // If we have the whole object, decode and return it. // TODO: use tokio Bytes type to re-use the same allocation by stripping the length at the start if data.len() >= data_length { - return Ok(Vec::::decode(&mut data.as_slice())?); + return Vec::::decode(&mut data.as_slice()) + .map_err(|source| Error::ObjectDecoding { source, mapping }); } } @@ -528,17 +534,21 @@ where // headers and optional padding. loop { segment_index += SegmentIndex::ONE; - let items = self.read_segment(segment_index).await?.into_items(); + let items = self + .read_segment(segment_index, mapping) + .await? + .into_items(); for segment_item in items { match segment_item { SegmentItem::BlockContinuation { bytes, .. } => { data.extend_from_slice(&bytes); if let Some(data_length) = - self.decode_data_length(data.as_slice(), piece_index, piece_offset)? + self.decode_data_length(data.as_slice(), mapping)? { if data.len() >= data_length { - return Ok(Vec::::decode(&mut data.as_slice())?); + return Vec::::decode(&mut data.as_slice()) + .map_err(|source| Error::ObjectDecoding { source, mapping }); } } } @@ -552,8 +562,7 @@ where debug!( collected_data = ?data.len(), %segment_index, - %piece_index, - piece_offset, + ?mapping, segment_item = format!("{segment_item:?}").chars().take(50).collect::(), "Unexpected segment item in continuing segment", ); @@ -561,8 +570,7 @@ where return Err(Error::UnexpectedContinuingSegmentItem { collected_data: data.len(), segment_index, - piece_index, - piece_offset, + mapping, segment_item: Box::new(segment_item), }); } @@ -572,66 +580,70 @@ where } /// Read the whole segment by its index (just records, skipping witnesses). - async fn read_segment(&self, segment_index: SegmentIndex) -> Result { - Ok(download_segment( + /// + /// The mapping is only used for error reporting. + async fn read_segment( + &self, + segment_index: SegmentIndex, + mapping: GlobalObject, + ) -> Result { + download_segment( segment_index, &self.piece_getter, self.erasure_coding.clone(), ) - .await?) + .await + .map_err(|source| Error::SegmentGetter { source, mapping }) } /// Concurrently read multiple pieces, and return them in the supplied order. - async fn read_pieces(&self, piece_indexes: Vec) -> Result, Error> { + /// + /// The mapping is only used for error reporting. + async fn read_pieces( + &self, + piece_indexes: &Vec, + mapping: GlobalObject, + ) -> Result, Error> { download_pieces(piece_indexes, &self.piece_getter) .await - .map_err(|source| Error::PieceGetterError { source }) + .map_err(|source| { + debug!( + ?piece_indexes, + error = ?source, + ?mapping, + "Error fetching pieces during object assembling" + ); + + Error::PieceGetterError { source, mapping } + }) } /// Read and return a single piece. /// - /// The mapping piece index and offset are only used for error reporting. + /// The mapping is only used for error reporting. async fn read_piece( &self, piece_index: PieceIndex, - mapping_piece_index: PieceIndex, - mapping_piece_offset: u32, + mapping: GlobalObject, ) -> Result { - let piece = self - .piece_getter - .get_piece(piece_index) + download_pieces(&vec![piece_index], &self.piece_getter) .await - .inspect_err(|source| { + .map(|pieces| { + pieces + .first() + .expect("download_pieces always returns exact pieces or error") + .clone() + }) + .map_err(|source| { debug!( %piece_index, error = ?source, - %mapping_piece_index, - mapping_piece_offset, + ?mapping, "Error fetching piece during object assembling" ); - })?; - if let Some(piece) = piece { - trace!( - %piece_index, - %mapping_piece_index, - mapping_piece_offset, - "Fetched piece during object assembling" - ); - - Ok(piece) - } else { - debug!( - %piece_index, - %mapping_piece_index, - mapping_piece_offset, - "Piece not found during object assembling" - ); - - Err(Error::PieceNotFound { - piece_index: mapping_piece_index, - })? - } + Error::PieceGetterError { source, mapping } + }) } /// Validate and decode the encoded length of `data`, including the encoded length bytes. @@ -640,12 +652,11 @@ where /// Returns `Ok(Some(data_length_encoded_length + data_length))` if the length is valid, /// `Ok(None)` if there aren't enough bytes to decode the length, otherwise an error. /// - /// The mapping piece index and offset are only used for error reporting. + /// The mapping is only used for error reporting. fn decode_data_length( &self, mut data: &[u8], - mapping_piece_index: PieceIndex, - mapping_piece_offset: u32, + mapping: GlobalObject, ) -> Result, Error> { let data_length = match Compact::::decode(&mut data) { Ok(Compact(data_length)) => { @@ -654,16 +665,14 @@ where debug!( data_length, max_object_len = self.max_object_len, - %mapping_piece_index, - mapping_piece_offset, + ?mapping, "Data length exceeds object size limit for object fetcher" ); return Err(Error::ObjectTooLarge { data_length, max_object_len: self.max_object_len, - piece_index: mapping_piece_index, - piece_offset: mapping_piece_offset, + mapping, }); } @@ -677,23 +686,20 @@ where debug!( length_prefix_len = data.len(), max_object_len = self.max_object_len, - %mapping_piece_index, - mapping_piece_offset, + ?mapping, "Length prefix exceeds object size limit for object fetcher" ); return Err(Error::LengthPrefixTooLarge { length_prefix_len: data.len(), max_object_len: self.max_object_len, - piece_index: mapping_piece_index, - piece_offset: mapping_piece_offset, + mapping, }); } debug!( ?err, - %mapping_piece_index, - mapping_piece_offset, + ?mapping, "Not enough bytes to decode data length for object" ); @@ -706,8 +712,7 @@ where trace!( data_length, data_length_encoded_length, - %mapping_piece_index, - mapping_piece_offset, + ?mapping, "Decoded data length for object" ); diff --git a/shared/subspace-data-retrieval/src/piece_fetcher.rs b/shared/subspace-data-retrieval/src/piece_fetcher.rs index 49f13a2ff4..ea702a1606 100644 --- a/shared/subspace-data-retrieval/src/piece_fetcher.rs +++ b/shared/subspace-data-retrieval/src/piece_fetcher.rs @@ -28,7 +28,7 @@ use tracing::{debug, trace}; // This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks(): // pub async fn download_pieces( - piece_indexes: Vec, + piece_indexes: &Vec, piece_getter: &PG, ) -> anyhow::Result> where From 0c0577dc24d68439fea717467c2fc35ab952633e Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 2 Jan 2025 13:08:13 +1000 Subject: [PATCH 4/6] Use tracing log macros directly --- .../subspace-data-retrieval/src/object_fetcher.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 179bc513ac..1264b9a613 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -206,7 +206,7 @@ where // Validate parameters if !piece_index.is_source() { - tracing::debug!( + debug!( ?mapping, "Invalid piece index for object: must be a source piece", ); @@ -216,7 +216,7 @@ where } if offset >= RawRecord::SIZE as u32 { - tracing::debug!( + debug!( ?mapping, RawRecord_SIZE = RawRecord::SIZE, "Invalid piece offset for object: must be less than the size of a raw record", @@ -245,13 +245,13 @@ where let data_hash = blake3_hash(&data); if data_hash != hash { - tracing::debug!( + debug!( ?data_hash, data_size = %data.len(), ?mapping, "Retrieved data doesn't match requested mapping hash" ); - tracing::trace!(data = %hex::encode(&data), "Retrieved data"); + trace!(data = %hex::encode(&data), "Retrieved data"); return Err(Error::InvalidDataHash { data_hash, @@ -423,7 +423,7 @@ where let offset_in_segment = piece_position_in_segment as usize * RawRecord::SIZE + offset as usize; - tracing::trace!( + trace!( %segment_index, offset_in_segment, piece_position_in_segment, @@ -466,7 +466,7 @@ where } })?; - tracing::trace!( + trace!( progress, %segment_index, offset_in_segment, @@ -510,7 +510,7 @@ where } }; - tracing::trace!( + trace!( %segment_index, offset_in_segment, piece_position_in_segment, From 6699fa6cfcbd51c8368cfc82aa6c51059ffff4cd Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 2 Jan 2025 14:19:49 +1000 Subject: [PATCH 5/6] Fetch object batches in RPC and HTTP gateway --- crates/subspace-gateway-rpc/src/lib.rs | 9 +- .../src/commands/http/server.rs | 78 +++++++----- crates/subspace-gateway/src/main.rs | 2 + .../src/object_fetcher.rs | 118 ++++++++++-------- 4 files changed, 118 insertions(+), 89 deletions(-) diff --git a/crates/subspace-gateway-rpc/src/lib.rs b/crates/subspace-gateway-rpc/src/lib.rs index c907ed844c..6dd689b46b 100644 --- a/crates/subspace-gateway-rpc/src/lib.rs +++ b/crates/subspace-gateway-rpc/src/lib.rs @@ -132,13 +132,8 @@ where return Err(Error::TooManyMappings { count }); } - let mut objects = Vec::with_capacity(count); - // TODO: fetch concurrently - for mapping in mappings.objects() { - let data = self.object_fetcher.fetch_object(*mapping).await?; - - objects.push(data.into()); - } + let objects = self.object_fetcher.fetch_objects(mappings).await?; + let objects = objects.into_iter().map(HexData::from).collect(); Ok(objects) } diff --git a/crates/subspace-gateway/src/commands/http/server.rs b/crates/subspace-gateway/src/commands/http/server.rs index 6efbd421e3..6e30883b1a 100644 --- a/crates/subspace-gateway/src/commands/http/server.rs +++ b/crates/subspace-gateway/src/commands/http/server.rs @@ -18,79 +18,101 @@ where pub(crate) http_endpoint: String, } -/// Requests the object mapping with `hash` from the indexer service. +/// Requests the object mappings for `hashes` from the indexer service. +/// Multiple hashes are separated by `+`. async fn request_object_mapping( endpoint: &str, - hash: Blake3Hash, + hashes: &Vec, ) -> anyhow::Result { let client = reqwest::Client::new(); - let object_mappings_url = format!("{}/objects/{}", endpoint, hex::encode(hash)); + let hash_list = hashes.iter().map(hex::encode).collect::>(); + let object_mappings_url = format!("{}/objects/{}", endpoint, hash_list.join("+")); - debug!(?hash, ?object_mappings_url, "Requesting object mapping..."); + debug!( + ?hashes, + ?object_mappings_url, + "Requesting object mappings..." + ); let response = client.get(&object_mappings_url).send().await?.json().await; match &response { Ok(json) => { - trace!(?hash, ?json, "Received object mapping"); + trace!(?hashes, ?json, "Received object mappings"); } Err(err) => { - error!(?hash, ?err, ?object_mappings_url, "Request failed"); + error!(?hashes, ?err, ?object_mappings_url, "Request failed"); } } response.map_err(|err| err.into()) } -/// Fetches a DSN object with `hash`, using the mapping indexer service. +/// Fetches the DSN objects with `hashes`, using the mapping indexer service. +/// Multiple hashes are separated by `+`. async fn serve_object( - hash: web::Path, + hashes: web::Path, additional_data: web::Data>>, ) -> impl Responder where PG: PieceGetter + Send + Sync + 'static, { let server_params = additional_data.into_inner(); - let hash = hash.into_inner(); + let hashes = hashes.into_inner(); + let hashes = hashes + .split('+') + .map(|s| { + let mut hash = Blake3Hash::default(); + hex::decode_to_slice(s, hash.as_mut()).map(|()| hash) + }) + .try_collect::>(); - let Ok(object_mapping) = request_object_mapping(&server_params.indexer_endpoint, hash).await - else { + let Ok(hashes) = hashes else { return HttpResponse::BadRequest().finish(); }; - // TODO: fetch multiple objects - let Some(&object_mapping) = object_mapping.objects.objects().first() else { + let Ok(object_mappings) = + request_object_mapping(&server_params.indexer_endpoint, &hashes).await + else { return HttpResponse::BadRequest().finish(); }; - if object_mapping.hash != hash { - error!( - ?object_mapping, - ?hash, - "Returned object mapping doesn't match requested hash" - ); - return HttpResponse::ServiceUnavailable().finish(); + for object_mapping in object_mappings.objects.objects() { + if !hashes.contains(&object_mapping.hash) { + error!( + ?object_mapping, + ?hashes, + "Returned object mapping wasn't in requested hashes" + ); + return HttpResponse::ServiceUnavailable().finish(); + } } let object_fetcher_result = server_params .object_fetcher - .fetch_object(object_mapping) + .fetch_objects(object_mappings.objects) .await; - let object = match object_fetcher_result { - Ok(object) => { - trace!(?hash, size = %object.len(), "Object fetched successfully"); - object + let objects = match object_fetcher_result { + Ok(objects) => { + trace!( + ?hashes, + count = %objects.len(), + sizes = ?objects.iter().map(|object| object.len()), + "Objects fetched successfully" + ); + objects } Err(err) => { - error!(?hash, ?err, "Failed to fetch object"); + error!(?hashes, ?err, "Failed to fetch objects"); return HttpResponse::ServiceUnavailable().finish(); } }; + // TODO: return a multi-part response, with one part per object HttpResponse::Ok() .content_type("application/octet-stream") - .body(object) + .body(objects.concat()) } /// Starts the DSN object HTTP server. @@ -103,7 +125,7 @@ where HttpServer::new(move || { App::new() .app_data(web::Data::new(server_params.clone())) - .route("/data/{hash}", web::get().to(serve_object::)) + .route("/data/{hashes}", web::get().to(serve_object::)) }) .bind(http_endpoint)? .run() diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index 65ae756983..70856c154a 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -1,5 +1,7 @@ //! Subspace gateway implementation. +#![feature(iterator_try_collect)] + mod commands; mod node_client; mod piece_getter; diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 1264b9a613..84e919be8c 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -22,7 +22,7 @@ use parity_scale_codec::{Compact, CompactLen, Decode, Encode}; use std::sync::Arc; use subspace_archiving::archiver::{Segment, SegmentItem}; use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; -use subspace_core_primitives::objects::GlobalObject; +use subspace_core_primitives::objects::{GlobalObject, GlobalObjectMapping}; use subspace_core_primitives::pieces::{Piece, PieceIndex, RawRecord}; use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentIndex}; use subspace_erasure_coding::ErasureCoding; @@ -193,74 +193,84 @@ where } } - /// Assemble the object in `mapping` by fetching necessary pieces using the piece getter, and - /// putting the object's bytes together. + /// Assemble the objects in `mapping` by fetching necessary pieces using the piece getter, and + /// putting the objects' bytes together. /// - /// Checks the object's hash to make sure the correct bytes are returned. - pub async fn fetch_object(&self, mapping: GlobalObject) -> Result, Error> { - let GlobalObject { - hash, - piece_index, - offset, - } = mapping; + /// Checks the objects' hashes to make sure the correct bytes are returned. + pub async fn fetch_objects( + &self, + mappings: GlobalObjectMapping, + ) -> Result>, Error> { + let mut objects = Vec::with_capacity(mappings.objects().len()); + + // TODO: sort mappings in piece index order, and keep pieces until they're no longer needed + for &mapping in mappings.objects() { + let GlobalObject { + hash, + piece_index, + offset, + } = mapping; + + // Validate parameters + if !piece_index.is_source() { + debug!( + ?mapping, + "Invalid piece index for object: must be a source piece", + ); - // Validate parameters - if !piece_index.is_source() { - debug!( - ?mapping, - "Invalid piece index for object: must be a source piece", - ); + // Parity pieces contain effectively random data, and can't be used to fetch objects + return Err(Error::NotSourcePiece { mapping }); + } - // Parity pieces contain effectively random data, and can't be used to fetch objects - return Err(Error::NotSourcePiece { mapping }); - } + if offset >= RawRecord::SIZE as u32 { + debug!( + ?mapping, + RawRecord_SIZE = RawRecord::SIZE, + "Invalid piece offset for object: must be less than the size of a raw record", + ); - if offset >= RawRecord::SIZE as u32 { - debug!( - ?mapping, - RawRecord_SIZE = RawRecord::SIZE, - "Invalid piece offset for object: must be less than the size of a raw record", - ); + return Err(Error::PieceOffsetTooLarge { mapping }); + } - return Err(Error::PieceOffsetTooLarge { mapping }); - } + // Try fast object assembling from individual pieces, + // then regular object assembling from segments + let data = match self.fetch_object_fast(mapping).await? { + Some(data) => data, + None => { + let data = self.fetch_object_regular(mapping).await?; + + debug!( + ?mapping, + len = %data.len(), + "Fetched object using regular object assembling", - // Try fast object assembling from individual pieces, - // then regular object assembling from segments - let data = match self.fetch_object_fast(mapping).await? { - Some(data) => data, - None => { - let data = self.fetch_object_regular(mapping).await?; + ); + data + } + }; + + let data_hash = blake3_hash(&data); + if data_hash != hash { debug!( + ?data_hash, + data_size = %data.len(), ?mapping, - len = %data.len(), - "Fetched object using regular object assembling", - + "Retrieved data doesn't match requested mapping hash" ); + trace!(data = %hex::encode(&data), "Retrieved data"); - data + return Err(Error::InvalidDataHash { + data_hash, + data_size: data.len(), + mapping, + }); } - }; - - let data_hash = blake3_hash(&data); - if data_hash != hash { - debug!( - ?data_hash, - data_size = %data.len(), - ?mapping, - "Retrieved data doesn't match requested mapping hash" - ); - trace!(data = %hex::encode(&data), "Retrieved data"); - return Err(Error::InvalidDataHash { - data_hash, - data_size: data.len(), - mapping, - }); + objects.push(data); } - Ok(data) + Ok(objects) } /// Fast object fetching and assembling where the object doesn't cross piece (super fast) or From e88bb14aa8e36ed0cde273e369b7d95db9dfc165 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 3 Jan 2025 12:12:28 +1000 Subject: [PATCH 6/6] Explain segment padding better --- shared/subspace-data-retrieval/src/object_fetcher.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 84e919be8c..85800987ef 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -30,8 +30,9 @@ use tracing::{debug, trace, warn}; /// The maximum amount of segment padding. /// -/// This is the difference between the compact encoding of lengths 1 to 63, and the compact -/// encoding of lengths 2^14 to 2^30 - 1. +/// This is the difference between the lengths of the compact encodings of the minimum and maximum +/// block sizes, in any domain. As of January 2025, the minimum block size is (potentially) 63 or +/// less, and the maximum block size is in the range 2^14 to 2^30 - 1. /// pub const MAX_SEGMENT_PADDING: usize = 3;