Skip to content

Commit 6699fa6

Browse files
committed
Fetch object batches in RPC and HTTP gateway
1 parent 0c0577d commit 6699fa6

File tree

4 files changed

+118
-89
lines changed

4 files changed

+118
-89
lines changed

crates/subspace-gateway-rpc/src/lib.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,8 @@ where
132132
return Err(Error::TooManyMappings { count });
133133
}
134134

135-
let mut objects = Vec::with_capacity(count);
136-
// TODO: fetch concurrently
137-
for mapping in mappings.objects() {
138-
let data = self.object_fetcher.fetch_object(*mapping).await?;
139-
140-
objects.push(data.into());
141-
}
135+
let objects = self.object_fetcher.fetch_objects(mappings).await?;
136+
let objects = objects.into_iter().map(HexData::from).collect();
142137

143138
Ok(objects)
144139
}

crates/subspace-gateway/src/commands/http/server.rs

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,79 +18,101 @@ where
1818
pub(crate) http_endpoint: String,
1919
}
2020

21-
/// Requests the object mapping with `hash` from the indexer service.
21+
/// Requests the object mappings for `hashes` from the indexer service.
22+
/// Multiple hashes are separated by `+`.
2223
async fn request_object_mapping(
2324
endpoint: &str,
24-
hash: Blake3Hash,
25+
hashes: &Vec<Blake3Hash>,
2526
) -> anyhow::Result<ObjectMappingResponse> {
2627
let client = reqwest::Client::new();
27-
let object_mappings_url = format!("{}/objects/{}", endpoint, hex::encode(hash));
28+
let hash_list = hashes.iter().map(hex::encode).collect::<Vec<_>>();
29+
let object_mappings_url = format!("{}/objects/{}", endpoint, hash_list.join("+"));
2830

29-
debug!(?hash, ?object_mappings_url, "Requesting object mapping...");
31+
debug!(
32+
?hashes,
33+
?object_mappings_url,
34+
"Requesting object mappings..."
35+
);
3036

3137
let response = client.get(&object_mappings_url).send().await?.json().await;
3238

3339
match &response {
3440
Ok(json) => {
35-
trace!(?hash, ?json, "Received object mapping");
41+
trace!(?hashes, ?json, "Received object mappings");
3642
}
3743
Err(err) => {
38-
error!(?hash, ?err, ?object_mappings_url, "Request failed");
44+
error!(?hashes, ?err, ?object_mappings_url, "Request failed");
3945
}
4046
}
4147

4248
response.map_err(|err| err.into())
4349
}
4450

45-
/// Fetches a DSN object with `hash`, using the mapping indexer service.
51+
/// Fetches the DSN objects with `hashes`, using the mapping indexer service.
52+
/// Multiple hashes are separated by `+`.
4653
async fn serve_object<PG>(
47-
hash: web::Path<Blake3Hash>,
54+
hashes: web::Path<String>,
4855
additional_data: web::Data<Arc<ServerParameters<PG>>>,
4956
) -> impl Responder
5057
where
5158
PG: PieceGetter + Send + Sync + 'static,
5259
{
5360
let server_params = additional_data.into_inner();
54-
let hash = hash.into_inner();
61+
let hashes = hashes.into_inner();
62+
let hashes = hashes
63+
.split('+')
64+
.map(|s| {
65+
let mut hash = Blake3Hash::default();
66+
hex::decode_to_slice(s, hash.as_mut()).map(|()| hash)
67+
})
68+
.try_collect::<Vec<_>>();
5569

56-
let Ok(object_mapping) = request_object_mapping(&server_params.indexer_endpoint, hash).await
57-
else {
70+
let Ok(hashes) = hashes else {
5871
return HttpResponse::BadRequest().finish();
5972
};
6073

61-
// TODO: fetch multiple objects
62-
let Some(&object_mapping) = object_mapping.objects.objects().first() else {
74+
let Ok(object_mappings) =
75+
request_object_mapping(&server_params.indexer_endpoint, &hashes).await
76+
else {
6377
return HttpResponse::BadRequest().finish();
6478
};
6579

66-
if object_mapping.hash != hash {
67-
error!(
68-
?object_mapping,
69-
?hash,
70-
"Returned object mapping doesn't match requested hash"
71-
);
72-
return HttpResponse::ServiceUnavailable().finish();
80+
for object_mapping in object_mappings.objects.objects() {
81+
if !hashes.contains(&object_mapping.hash) {
82+
error!(
83+
?object_mapping,
84+
?hashes,
85+
"Returned object mapping wasn't in requested hashes"
86+
);
87+
return HttpResponse::ServiceUnavailable().finish();
88+
}
7389
}
7490

7591
let object_fetcher_result = server_params
7692
.object_fetcher
77-
.fetch_object(object_mapping)
93+
.fetch_objects(object_mappings.objects)
7894
.await;
7995

80-
let object = match object_fetcher_result {
81-
Ok(object) => {
82-
trace!(?hash, size = %object.len(), "Object fetched successfully");
83-
object
96+
let objects = match object_fetcher_result {
97+
Ok(objects) => {
98+
trace!(
99+
?hashes,
100+
count = %objects.len(),
101+
sizes = ?objects.iter().map(|object| object.len()),
102+
"Objects fetched successfully"
103+
);
104+
objects
84105
}
85106
Err(err) => {
86-
error!(?hash, ?err, "Failed to fetch object");
107+
error!(?hashes, ?err, "Failed to fetch objects");
87108
return HttpResponse::ServiceUnavailable().finish();
88109
}
89110
};
90111

112+
// TODO: return a multi-part response, with one part per object
91113
HttpResponse::Ok()
92114
.content_type("application/octet-stream")
93-
.body(object)
115+
.body(objects.concat())
94116
}
95117

96118
/// Starts the DSN object HTTP server.
@@ -103,7 +125,7 @@ where
103125
HttpServer::new(move || {
104126
App::new()
105127
.app_data(web::Data::new(server_params.clone()))
106-
.route("/data/{hash}", web::get().to(serve_object::<PG>))
128+
.route("/data/{hashes}", web::get().to(serve_object::<PG>))
107129
})
108130
.bind(http_endpoint)?
109131
.run()

crates/subspace-gateway/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Subspace gateway implementation.
22
3+
#![feature(iterator_try_collect)]
4+
35
mod commands;
46
mod node_client;
57
mod piece_getter;

shared/subspace-data-retrieval/src/object_fetcher.rs

Lines changed: 64 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
2222
use std::sync::Arc;
2323
use subspace_archiving::archiver::{Segment, SegmentItem};
2424
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
25-
use subspace_core_primitives::objects::GlobalObject;
25+
use subspace_core_primitives::objects::{GlobalObject, GlobalObjectMapping};
2626
use subspace_core_primitives::pieces::{Piece, PieceIndex, RawRecord};
2727
use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentIndex};
2828
use subspace_erasure_coding::ErasureCoding;
@@ -193,74 +193,84 @@ where
193193
}
194194
}
195195

196-
/// Assemble the object in `mapping` by fetching necessary pieces using the piece getter, and
197-
/// putting the object's bytes together.
196+
/// Assemble the objects in `mapping` by fetching necessary pieces using the piece getter, and
197+
/// putting the objects' bytes together.
198198
///
199-
/// Checks the object's hash to make sure the correct bytes are returned.
200-
pub async fn fetch_object(&self, mapping: GlobalObject) -> Result<Vec<u8>, Error> {
201-
let GlobalObject {
202-
hash,
203-
piece_index,
204-
offset,
205-
} = mapping;
199+
/// Checks the objects' hashes to make sure the correct bytes are returned.
200+
pub async fn fetch_objects(
201+
&self,
202+
mappings: GlobalObjectMapping,
203+
) -> Result<Vec<Vec<u8>>, Error> {
204+
let mut objects = Vec::with_capacity(mappings.objects().len());
205+
206+
// TODO: sort mappings in piece index order, and keep pieces until they're no longer needed
207+
for &mapping in mappings.objects() {
208+
let GlobalObject {
209+
hash,
210+
piece_index,
211+
offset,
212+
} = mapping;
213+
214+
// Validate parameters
215+
if !piece_index.is_source() {
216+
debug!(
217+
?mapping,
218+
"Invalid piece index for object: must be a source piece",
219+
);
206220

207-
// Validate parameters
208-
if !piece_index.is_source() {
209-
debug!(
210-
?mapping,
211-
"Invalid piece index for object: must be a source piece",
212-
);
221+
// Parity pieces contain effectively random data, and can't be used to fetch objects
222+
return Err(Error::NotSourcePiece { mapping });
223+
}
213224

214-
// Parity pieces contain effectively random data, and can't be used to fetch objects
215-
return Err(Error::NotSourcePiece { mapping });
216-
}
225+
if offset >= RawRecord::SIZE as u32 {
226+
debug!(
227+
?mapping,
228+
RawRecord_SIZE = RawRecord::SIZE,
229+
"Invalid piece offset for object: must be less than the size of a raw record",
230+
);
217231

218-
if offset >= RawRecord::SIZE as u32 {
219-
debug!(
220-
?mapping,
221-
RawRecord_SIZE = RawRecord::SIZE,
222-
"Invalid piece offset for object: must be less than the size of a raw record",
223-
);
232+
return Err(Error::PieceOffsetTooLarge { mapping });
233+
}
224234

225-
return Err(Error::PieceOffsetTooLarge { mapping });
226-
}
235+
// Try fast object assembling from individual pieces,
236+
// then regular object assembling from segments
237+
let data = match self.fetch_object_fast(mapping).await? {
238+
Some(data) => data,
239+
None => {
240+
let data = self.fetch_object_regular(mapping).await?;
241+
242+
debug!(
243+
?mapping,
244+
len = %data.len(),
245+
"Fetched object using regular object assembling",
227246

228-
// Try fast object assembling from individual pieces,
229-
// then regular object assembling from segments
230-
let data = match self.fetch_object_fast(mapping).await? {
231-
Some(data) => data,
232-
None => {
233-
let data = self.fetch_object_regular(mapping).await?;
247+
);
234248

249+
data
250+
}
251+
};
252+
253+
let data_hash = blake3_hash(&data);
254+
if data_hash != hash {
235255
debug!(
256+
?data_hash,
257+
data_size = %data.len(),
236258
?mapping,
237-
len = %data.len(),
238-
"Fetched object using regular object assembling",
239-
259+
"Retrieved data doesn't match requested mapping hash"
240260
);
261+
trace!(data = %hex::encode(&data), "Retrieved data");
241262

242-
data
263+
return Err(Error::InvalidDataHash {
264+
data_hash,
265+
data_size: data.len(),
266+
mapping,
267+
});
243268
}
244-
};
245-
246-
let data_hash = blake3_hash(&data);
247-
if data_hash != hash {
248-
debug!(
249-
?data_hash,
250-
data_size = %data.len(),
251-
?mapping,
252-
"Retrieved data doesn't match requested mapping hash"
253-
);
254-
trace!(data = %hex::encode(&data), "Retrieved data");
255269

256-
return Err(Error::InvalidDataHash {
257-
data_hash,
258-
data_size: data.len(),
259-
mapping,
260-
});
270+
objects.push(data);
261271
}
262272

263-
Ok(data)
273+
Ok(objects)
264274
}
265275

266276
/// Fast object fetching and assembling where the object doesn't cross piece (super fast) or

0 commit comments

Comments
 (0)