Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update gateway HTTP server object mapping format #3334

Merged
merged 6 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 2 additions & 15 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use subspace_archiving::archiver::NewArchivedSegment;
use subspace_core_primitives::hashes::Blake3Hash;
use subspace_core_primitives::objects::GlobalObjectMapping;
use subspace_core_primitives::objects::{GlobalObjectMapping, ObjectMappingResponse};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
use subspace_core_primitives::solutions::Solution;
use subspace_core_primitives::{BlockHash, BlockNumber, PublicKey, SlotNumber};
use subspace_core_primitives::{BlockHash, PublicKey, SlotNumber};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_kzg::Kzg;
Expand Down Expand Up @@ -223,19 +223,6 @@ impl CachedArchivedSegment {
}
}

/// Response to object mapping subscription, including a block height.
/// Large responses are batched, so the block height can be repeated in different responses.
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ObjectMappingResponse {
/// The block number that the object mapping is from.
pub block_number: BlockNumber,

/// The object mappings.
#[serde(flatten)]
pub objects: GlobalObjectMapping,
}

/// Subspace RPC configuration
pub struct SubspaceRpcConfig<Client, SO, AS>
where
Expand Down
15 changes: 15 additions & 0 deletions crates/subspace-core-primitives/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern crate alloc;

use crate::hashes::Blake3Hash;
use crate::pieces::PieceIndex;
use crate::BlockNumber;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use core::default::Default;
Expand Down Expand Up @@ -172,3 +173,17 @@ impl GlobalObjectMapping {
}
}
}

/// Response to object mapping subscription, including a block height.
/// Large responses are batched, so the block height can be repeated in different responses.
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub struct ObjectMappingResponse {
/// The block number that the object mapping is from.
pub block_number: BlockNumber,

/// The object mappings.
#[cfg_attr(feature = "serde", serde(flatten))]
pub objects: GlobalObjectMapping,
}
Comment on lines +177 to +189
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think "Response" belongs to the core primitives, this is very high-level much application level stuff. We have a separate crate for RPC primitives, would that be a better place for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s used as an RPC response, then as an RPC and HTTP argument, so that makes sense. I’ll move it next time we change that code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think of subspace-core-protocol as something third-party would have to have in order to build an alternative client. The bare minimum that is mandatory in all cases regardless of what the rest of the implementation looks like.

RPC stuff is much higher level than that, might look different or be missing altogether in some clients, etc.

41 changes: 3 additions & 38 deletions crates/subspace-gateway-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
use std::fmt;
use std::ops::{Deref, DerefMut};
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use subspace_core_primitives::objects::GlobalObjectMapping;
use subspace_data_retrieval::object_fetcher::{self, ObjectFetcher};
use subspace_data_retrieval::piece_getter::PieceGetter;
use tracing::{debug, error, trace};
use tracing::{debug, error};

const SUBSPACE_ERROR: i32 = 9000;

Expand All @@ -33,17 +32,6 @@ pub enum Error {
/// The object fetcher failed.
#[error(transparent)]
ObjectFetcherError(#[from] object_fetcher::Error),

/// The returned object data did not match the hash in the mapping.
#[error(
"Invalid object hash, mapping had {mapping_hash:?}, but fetched data had {data_hash:?}"
)]
InvalidObjectHash {
/// The expected hash from the mapping.
mapping_hash: Blake3Hash,
/// The actual hash of the returned object data.
data_hash: Blake3Hash,
},
}

impl From<Error> for ErrorObjectOwned {
Expand Down Expand Up @@ -144,31 +132,8 @@ where
return Err(Error::TooManyMappings { count });
}

let mut objects = Vec::with_capacity(count);
// TODO: fetch concurrently
for mapping in mappings.objects() {
let data = self
.object_fetcher
.fetch_object(mapping.piece_index, mapping.offset)
.await?;

let data_hash = blake3_hash(&data);
if data_hash != mapping.hash {
error!(
?data_hash,
data_size = %data.len(),
?mapping.hash,
"Retrieved data did not match mapping hash",
);
trace!(data = %hex::encode(data), "Retrieved data");
return Err(Error::InvalidObjectHash {
mapping_hash: mapping.hash,
data_hash,
});
}

objects.push(data.into());
}
let objects = self.object_fetcher.fetch_objects(mappings).await?;
let objects = objects.into_iter().map(HexData::from).collect();

Ok(objects)
}
Expand Down
128 changes: 59 additions & 69 deletions crates/subspace-gateway/src/commands/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
//! HTTP server which fetches objects from the DSN based on a hash, using a mapping indexer service.

use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use serde::{Deserialize, Deserializer, Serialize};
use std::default::Default;
use std::sync::Arc;
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use subspace_core_primitives::pieces::PieceIndex;
use subspace_core_primitives::BlockNumber;
use subspace_core_primitives::hashes::Blake3Hash;
use subspace_core_primitives::objects::ObjectMappingResponse;
use subspace_data_retrieval::object_fetcher::ObjectFetcher;
use subspace_data_retrieval::piece_getter::PieceGetter;
use tracing::{debug, error, trace};
Expand All @@ -21,108 +18,101 @@ where
pub(crate) http_endpoint: String,
}

/// Object mapping format from the indexer service.
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
struct ObjectMapping {
hash: Blake3Hash,
piece_index: PieceIndex,
piece_offset: u32,
#[serde(deserialize_with = "string_to_u32")]
block_number: BlockNumber,
}

/// Utility function to deserialize a JSON string into a u32.
fn string_to_u32<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
s.parse::<u32>().map_err(serde::de::Error::custom)
}

/// Requests an object mapping with `hash` from the indexer service.
async fn request_object_mapping(endpoint: &str, hash: Blake3Hash) -> anyhow::Result<ObjectMapping> {
/// Requests the object mappings for `hashes` from the indexer service.
/// Multiple hashes are separated by `+`.
async fn request_object_mapping(
endpoint: &str,
hashes: &Vec<Blake3Hash>,
) -> anyhow::Result<ObjectMappingResponse> {
let client = reqwest::Client::new();
let object_mappings_url = format!("{}/objects/{}", endpoint, hex::encode(hash));
let hash_list = hashes.iter().map(hex::encode).collect::<Vec<_>>();
let object_mappings_url = format!("{}/objects/{}", endpoint, hash_list.join("+"));

debug!(?hash, ?object_mappings_url, "Requesting object mapping...");
debug!(
?hashes,
?object_mappings_url,
"Requesting object mappings..."
);

let response = client.get(&object_mappings_url).send().await?.json().await;

let response = client
.get(&object_mappings_url)
.send()
.await?
.json::<ObjectMapping>()
.await;
match &response {
Ok(json) => {
trace!(?hash, ?json, "Received object mapping");
trace!(?hashes, ?json, "Received object mappings");
}
Err(err) => {
error!(?hash, ?err, ?object_mappings_url, "Request failed");
error!(?hashes, ?err, ?object_mappings_url, "Request failed");
}
}

response.map_err(|err| err.into())
}

/// Fetches a DSN object with `hash`, using the mapping indexer service.
/// Fetches the DSN objects with `hashes`, using the mapping indexer service.
/// Multiple hashes are separated by `+`.
async fn serve_object<PG>(
hash: web::Path<Blake3Hash>,
hashes: web::Path<String>,
additional_data: web::Data<Arc<ServerParameters<PG>>>,
) -> impl Responder
where
PG: PieceGetter + Send + Sync + 'static,
{
let server_params = additional_data.into_inner();
let hash = hash.into_inner();
let hashes = hashes.into_inner();
let hashes = hashes
.split('+')
.map(|s| {
let mut hash = Blake3Hash::default();
hex::decode_to_slice(s, hash.as_mut()).map(|()| hash)
})
.try_collect::<Vec<_>>();

let Ok(hashes) = hashes else {
return HttpResponse::BadRequest().finish();
};

let Ok(object_mapping) = request_object_mapping(&server_params.indexer_endpoint, hash).await
let Ok(object_mappings) =
request_object_mapping(&server_params.indexer_endpoint, &hashes).await
else {
return HttpResponse::BadRequest().finish();
};

if object_mapping.hash != hash {
error!(
?object_mapping,
?hash,
"Returned object mapping doesn't match requested hash"
);
return HttpResponse::ServiceUnavailable().finish();
for object_mapping in object_mappings.objects.objects() {
if !hashes.contains(&object_mapping.hash) {
error!(
?object_mapping,
?hashes,
"Returned object mapping wasn't in requested hashes"
);
return HttpResponse::ServiceUnavailable().finish();
}
}

let object_fetcher_result = server_params
.object_fetcher
.fetch_object(object_mapping.piece_index, object_mapping.piece_offset)
.fetch_objects(object_mappings.objects)
.await;

let object = match object_fetcher_result {
Ok(object) => {
trace!(?hash, size = %object.len(), "Object fetched successfully");

let data_hash = blake3_hash(&object);
if data_hash != hash {
error!(
?data_hash,
data_size = %object.len(),
?hash,
"Retrieved data doesn't match requested mapping hash"
);
trace!(data = %hex::encode(object), "Retrieved data");
return HttpResponse::ServiceUnavailable().finish();
}

object
let objects = match object_fetcher_result {
Ok(objects) => {
trace!(
?hashes,
count = %objects.len(),
sizes = ?objects.iter().map(|object| object.len()),
"Objects fetched successfully"
);
objects
}
Err(err) => {
error!(?hash, ?err, "Failed to fetch object");
error!(?hashes, ?err, "Failed to fetch objects");
return HttpResponse::ServiceUnavailable().finish();
}
};

// TODO: return a multi-part response, with one part per object
HttpResponse::Ok()
.content_type("application/octet-stream")
.body(object)
.body(objects.concat())
}

/// Starts the DSN object HTTP server.
Expand All @@ -135,7 +125,7 @@ where
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(server_params.clone()))
.route("/data/{hash}", web::get().to(serve_object::<PG>))
.route("/data/{hashes}", web::get().to(serve_object::<PG>))
})
.bind(http_endpoint)?
.run()
Expand Down
2 changes: 2 additions & 0 deletions crates/subspace-gateway/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Subspace gateway implementation.

#![feature(iterator_try_collect)]

mod commands;
mod node_client;
mod piece_getter;
Expand Down
1 change: 1 addition & 0 deletions shared/subspace-data-retrieval/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ include = [
anyhow = "1.0.89"
async-trait = "0.1.83"
futures = "0.3.31"
hex = "0.4.3"
parity-scale-codec = { version = "3.6.12", features = ["derive"] }
subspace-archiving = { version = "0.1.0", path = "../../crates/subspace-archiving" }
subspace-core-primitives = { version = "0.1.0", path = "../../crates/subspace-core-primitives" }
Expand Down
Loading
Loading