Skip to content

Commit

Permalink
Merge pull request #3334 from autonomys/gateway-string-or-int
Browse files Browse the repository at this point in the history
Update gateway HTTP server object mapping format
  • Loading branch information
teor2345 authored Jan 6, 2025
2 parents 0992ef6 + e88bb14 commit f9b1769
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 322 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 2 additions & 15 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use subspace_archiving::archiver::NewArchivedSegment;
use subspace_core_primitives::hashes::Blake3Hash;
use subspace_core_primitives::objects::GlobalObjectMapping;
use subspace_core_primitives::objects::{GlobalObjectMapping, ObjectMappingResponse};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
use subspace_core_primitives::solutions::Solution;
use subspace_core_primitives::{BlockHash, BlockNumber, PublicKey, SlotNumber};
use subspace_core_primitives::{BlockHash, PublicKey, SlotNumber};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_kzg::Kzg;
Expand Down Expand Up @@ -223,19 +223,6 @@ impl CachedArchivedSegment {
}
}

/// Response to object mapping subscription, including a block height.
/// Large responses are batched, so the block height can be repeated in different responses.
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ObjectMappingResponse {
/// The block number that the object mapping is from.
pub block_number: BlockNumber,

/// The object mappings.
#[serde(flatten)]
pub objects: GlobalObjectMapping,
}

/// Subspace RPC configuration
pub struct SubspaceRpcConfig<Client, SO, AS>
where
Expand Down
15 changes: 15 additions & 0 deletions crates/subspace-core-primitives/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern crate alloc;

use crate::hashes::Blake3Hash;
use crate::pieces::PieceIndex;
use crate::BlockNumber;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use core::default::Default;
Expand Down Expand Up @@ -172,3 +173,17 @@ impl GlobalObjectMapping {
}
}
}

/// Response to object mapping subscription, including a block height.
/// Large responses are batched, so the block height can be repeated in different responses.
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub struct ObjectMappingResponse {
/// The block number that the object mapping is from.
pub block_number: BlockNumber,

/// The object mappings.
#[cfg_attr(feature = "serde", serde(flatten))]
pub objects: GlobalObjectMapping,
}
41 changes: 3 additions & 38 deletions crates/subspace-gateway-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
use std::fmt;
use std::ops::{Deref, DerefMut};
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use subspace_core_primitives::objects::GlobalObjectMapping;
use subspace_data_retrieval::object_fetcher::{self, ObjectFetcher};
use subspace_data_retrieval::piece_getter::PieceGetter;
use tracing::{debug, error, trace};
use tracing::{debug, error};

const SUBSPACE_ERROR: i32 = 9000;

Expand All @@ -33,17 +32,6 @@ pub enum Error {
/// The object fetcher failed.
#[error(transparent)]
ObjectFetcherError(#[from] object_fetcher::Error),

/// The returned object data did not match the hash in the mapping.
#[error(
"Invalid object hash, mapping had {mapping_hash:?}, but fetched data had {data_hash:?}"
)]
InvalidObjectHash {
/// The expected hash from the mapping.
mapping_hash: Blake3Hash,
/// The actual hash of the returned object data.
data_hash: Blake3Hash,
},
}

impl From<Error> for ErrorObjectOwned {
Expand Down Expand Up @@ -144,31 +132,8 @@ where
return Err(Error::TooManyMappings { count });
}

let mut objects = Vec::with_capacity(count);
// TODO: fetch concurrently
for mapping in mappings.objects() {
let data = self
.object_fetcher
.fetch_object(mapping.piece_index, mapping.offset)
.await?;

let data_hash = blake3_hash(&data);
if data_hash != mapping.hash {
error!(
?data_hash,
data_size = %data.len(),
?mapping.hash,
"Retrieved data did not match mapping hash",
);
trace!(data = %hex::encode(data), "Retrieved data");
return Err(Error::InvalidObjectHash {
mapping_hash: mapping.hash,
data_hash,
});
}

objects.push(data.into());
}
let objects = self.object_fetcher.fetch_objects(mappings).await?;
let objects = objects.into_iter().map(HexData::from).collect();

Ok(objects)
}
Expand Down
128 changes: 59 additions & 69 deletions crates/subspace-gateway/src/commands/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
//! HTTP server which fetches objects from the DSN based on a hash, using a mapping indexer service.
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use serde::{Deserialize, Deserializer, Serialize};
use std::default::Default;
use std::sync::Arc;
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use subspace_core_primitives::pieces::PieceIndex;
use subspace_core_primitives::BlockNumber;
use subspace_core_primitives::hashes::Blake3Hash;
use subspace_core_primitives::objects::ObjectMappingResponse;
use subspace_data_retrieval::object_fetcher::ObjectFetcher;
use subspace_data_retrieval::piece_getter::PieceGetter;
use tracing::{debug, error, trace};
Expand All @@ -21,108 +18,101 @@ where
pub(crate) http_endpoint: String,
}

/// Object mapping format from the indexer service.
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
struct ObjectMapping {
hash: Blake3Hash,
piece_index: PieceIndex,
piece_offset: u32,
#[serde(deserialize_with = "string_to_u32")]
block_number: BlockNumber,
}

/// Utility function to deserialize a JSON string into a u32.
fn string_to_u32<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
s.parse::<u32>().map_err(serde::de::Error::custom)
}

/// Requests an object mapping with `hash` from the indexer service.
async fn request_object_mapping(endpoint: &str, hash: Blake3Hash) -> anyhow::Result<ObjectMapping> {
/// Requests the object mappings for `hashes` from the indexer service.
/// Multiple hashes are separated by `+`.
async fn request_object_mapping(
endpoint: &str,
hashes: &Vec<Blake3Hash>,
) -> anyhow::Result<ObjectMappingResponse> {
let client = reqwest::Client::new();
let object_mappings_url = format!("{}/objects/{}", endpoint, hex::encode(hash));
let hash_list = hashes.iter().map(hex::encode).collect::<Vec<_>>();
let object_mappings_url = format!("{}/objects/{}", endpoint, hash_list.join("+"));

debug!(?hash, ?object_mappings_url, "Requesting object mapping...");
debug!(
?hashes,
?object_mappings_url,
"Requesting object mappings..."
);

let response = client.get(&object_mappings_url).send().await?.json().await;

let response = client
.get(&object_mappings_url)
.send()
.await?
.json::<ObjectMapping>()
.await;
match &response {
Ok(json) => {
trace!(?hash, ?json, "Received object mapping");
trace!(?hashes, ?json, "Received object mappings");
}
Err(err) => {
error!(?hash, ?err, ?object_mappings_url, "Request failed");
error!(?hashes, ?err, ?object_mappings_url, "Request failed");
}
}

response.map_err(|err| err.into())
}

/// Fetches a DSN object with `hash`, using the mapping indexer service.
/// Fetches the DSN objects with `hashes`, using the mapping indexer service.
/// Multiple hashes are separated by `+`.
async fn serve_object<PG>(
hash: web::Path<Blake3Hash>,
hashes: web::Path<String>,
additional_data: web::Data<Arc<ServerParameters<PG>>>,
) -> impl Responder
where
PG: PieceGetter + Send + Sync + 'static,
{
let server_params = additional_data.into_inner();
let hash = hash.into_inner();
let hashes = hashes.into_inner();
let hashes = hashes
.split('+')
.map(|s| {
let mut hash = Blake3Hash::default();
hex::decode_to_slice(s, hash.as_mut()).map(|()| hash)
})
.try_collect::<Vec<_>>();

let Ok(hashes) = hashes else {
return HttpResponse::BadRequest().finish();
};

let Ok(object_mapping) = request_object_mapping(&server_params.indexer_endpoint, hash).await
let Ok(object_mappings) =
request_object_mapping(&server_params.indexer_endpoint, &hashes).await
else {
return HttpResponse::BadRequest().finish();
};

if object_mapping.hash != hash {
error!(
?object_mapping,
?hash,
"Returned object mapping doesn't match requested hash"
);
return HttpResponse::ServiceUnavailable().finish();
for object_mapping in object_mappings.objects.objects() {
if !hashes.contains(&object_mapping.hash) {
error!(
?object_mapping,
?hashes,
"Returned object mapping wasn't in requested hashes"
);
return HttpResponse::ServiceUnavailable().finish();
}
}

let object_fetcher_result = server_params
.object_fetcher
.fetch_object(object_mapping.piece_index, object_mapping.piece_offset)
.fetch_objects(object_mappings.objects)
.await;

let object = match object_fetcher_result {
Ok(object) => {
trace!(?hash, size = %object.len(), "Object fetched successfully");

let data_hash = blake3_hash(&object);
if data_hash != hash {
error!(
?data_hash,
data_size = %object.len(),
?hash,
"Retrieved data doesn't match requested mapping hash"
);
trace!(data = %hex::encode(object), "Retrieved data");
return HttpResponse::ServiceUnavailable().finish();
}

object
let objects = match object_fetcher_result {
Ok(objects) => {
trace!(
?hashes,
count = %objects.len(),
sizes = ?objects.iter().map(|object| object.len()),
"Objects fetched successfully"
);
objects
}
Err(err) => {
error!(?hash, ?err, "Failed to fetch object");
error!(?hashes, ?err, "Failed to fetch objects");
return HttpResponse::ServiceUnavailable().finish();
}
};

// TODO: return a multi-part response, with one part per object
HttpResponse::Ok()
.content_type("application/octet-stream")
.body(object)
.body(objects.concat())
}

/// Starts the DSN object HTTP server.
Expand All @@ -135,7 +125,7 @@ where
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(server_params.clone()))
.route("/data/{hash}", web::get().to(serve_object::<PG>))
.route("/data/{hashes}", web::get().to(serve_object::<PG>))
})
.bind(http_endpoint)?
.run()
Expand Down
2 changes: 2 additions & 0 deletions crates/subspace-gateway/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Subspace gateway implementation.
#![feature(iterator_try_collect)]

mod commands;
mod node_client;
mod piece_getter;
Expand Down
1 change: 1 addition & 0 deletions shared/subspace-data-retrieval/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ include = [
anyhow = "1.0.89"
async-trait = "0.1.83"
futures = "0.3.31"
hex = "0.4.3"
parity-scale-codec = { version = "3.6.12", features = ["derive"] }
subspace-archiving = { version = "0.1.0", path = "../../crates/subspace-archiving" }
subspace-core-primitives = { version = "0.1.0", path = "../../crates/subspace-core-primitives" }
Expand Down
Loading

0 comments on commit f9b1769

Please sign in to comment.