Skip to content

Commit 1f39783

Browse files
authored
Merge pull request #2979 from autonomys/obj-map-rpc
Add RPC subscription for piece object mappings
2 parents 90bb050 + 38d5f48 commit 1f39783

File tree

3 files changed

+94
-8
lines changed

3 files changed

+94
-8
lines changed

crates/sc-consensus-subspace-rpc/src/lib.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#![feature(try_blocks)]
2121

2222
use futures::channel::mpsc;
23-
use futures::{future, FutureExt, StreamExt};
23+
use futures::{future, stream, FutureExt, StreamExt};
2424
use jsonrpsee::core::async_trait;
2525
use jsonrpsee::proc_macros::rpc;
2626
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
@@ -58,6 +58,7 @@ use std::sync::{Arc, Weak};
5858
use std::time::Duration;
5959
use subspace_archiving::archiver::NewArchivedSegment;
6060
use subspace_core_primitives::crypto::kzg::Kzg;
61+
use subspace_core_primitives::objects::GlobalObject;
6162
use subspace_core_primitives::{
6263
BlockHash, HistorySize, Piece, PieceIndex, PublicKey, SegmentHeader, SegmentIndex, SlotNumber,
6364
Solution,
@@ -155,6 +156,16 @@ pub trait SubspaceRpcApi {
155156

156157
#[method(name = "subspace_lastSegmentHeaders")]
157158
async fn last_segment_headers(&self, limit: u64) -> Result<Vec<Option<SegmentHeader>>, Error>;
159+
160+
/// Block/transaction archived object mappings subscription
161+
#[subscription(
162+
name = "subspace_subscribeArchivedObjectMappings" => "subspace_archived_object_mappings",
163+
unsubscribe = "subspace_unsubscribeArchivedObjectMappings",
164+
item = GlobalObject,
165+
)]
166+
fn subscribe_archived_object_mappings(&self);
167+
168+
// TODO: add a method for recent/any object mappings based on a list of IDs, piece indexes, or segment indexes
158169
}
159170

160171
#[derive(Default)]
@@ -703,6 +714,7 @@ where
703714
Ok(())
704715
}
705716

717+
// Note: this RPC uses the cached archived segment, which is only updated by archived segments subscriptions
706718
fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
707719
self.deny_unsafe.check_if_safe()?;
708720

@@ -807,4 +819,30 @@ where
807819

808820
Ok(last_segment_headers)
809821
}
822+
823+
// TODO:
824+
// - the number of object mappings in each segment can be very large (hundreds or thousands).
825+
// To avoid RPC connection failures, limit the number of mappings returned in each response,
826+
// or the number of in-flight responses.
827+
fn subscribe_archived_object_mappings(&self, pending: PendingSubscriptionSink) {
828+
// The genesis segment isn't included in this stream. In other methods we recreate is as the first segment,
829+
// but there aren't any mappings in it, so we don't need to recreate it as part of this subscription.
830+
831+
let stream = self
832+
.archived_segment_notification_stream
833+
.subscribe()
834+
.flat_map(|archived_segment_notification| {
835+
let objects = archived_segment_notification
836+
.archived_segment
837+
.global_object_mappings();
838+
839+
stream::iter(objects)
840+
});
841+
842+
self.subscription_executor.spawn(
843+
"subspace-archived-object-mappings-subscription",
844+
Some("rpc"),
845+
pipe_from_stream(pending, stream).boxed(),
846+
);
847+
}
810848
}

crates/subspace-archiving/src/archiver.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use rayon::prelude::*;
3434
use subspace_core_primitives::crypto::kzg::{Commitment, Kzg, Witness};
3535
use subspace_core_primitives::crypto::{blake3_254_hash_to_scalar, Scalar};
3636
use subspace_core_primitives::objects::{
37-
BlockObject, BlockObjectMapping, PieceObject, PieceObjectMapping,
37+
BlockObject, BlockObjectMapping, GlobalObject, PieceObject, PieceObjectMapping,
3838
};
3939
use subspace_core_primitives::{
4040
ArchivedBlockProgress, ArchivedHistorySegment, Blake3Hash, BlockNumber, LastArchivedBlock,
@@ -184,6 +184,34 @@ pub struct NewArchivedSegment {
184184
pub object_mapping: Vec<PieceObjectMapping>,
185185
}
186186

187+
impl NewArchivedSegment {
188+
/// Returns all the object mappings in this archived segment as a lazy iterator.
189+
pub fn global_object_mappings(&self) -> impl Iterator<Item = GlobalObject> + 'static {
190+
// Save memory by only returning the necessary parts of NewArchivedSegment
191+
let object_mapping = self.object_mapping.clone();
192+
let piece_indexes = self
193+
.segment_header
194+
.segment_index()
195+
.segment_piece_indexes_source_first();
196+
197+
// Iterate through the object mapping vector for each piece
198+
object_mapping
199+
.into_iter()
200+
.zip(piece_indexes)
201+
.flat_map(|(piece_mappings, piece_index)| {
202+
// And then through each individual object mapping in the piece
203+
piece_mappings
204+
.objects
205+
.into_iter()
206+
.map(move |piece_object| GlobalObject::V0 {
207+
piece_index,
208+
offset: piece_object.offset(),
209+
hash: piece_object.hash(),
210+
})
211+
})
212+
}
213+
}
214+
187215
/// Archiver instantiation error
188216
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
189217
#[cfg_attr(feature = "thiserror", derive(thiserror::Error))]

crates/subspace-core-primitives/src/objects.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ pub struct BlockObjectMapping {
8383
pub objects: Vec<BlockObject>,
8484
}
8585

86-
/// Object stored inside of the block
86+
/// Object stored inside of the piece
8787
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Encode, Decode, TypeInfo)]
8888
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
8989
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
@@ -95,7 +95,7 @@ pub enum PieceObject {
9595
/// Object hash
9696
hash: Blake3Hash,
9797
// TODO: This is a raw record offset, not a regular one
98-
/// Offset of the object
98+
/// Offset of the object in that piece
9999
offset: u32,
100100
},
101101
}
@@ -121,11 +121,11 @@ impl PieceObject {
121121
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
122122
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
123123
pub struct PieceObjectMapping {
124-
/// Objects stored inside of the block
124+
/// Objects stored inside of the piece
125125
pub objects: Vec<PieceObject>,
126126
}
127127

128-
/// Object stored inside in the history of the blockchain
128+
/// Object stored in the history of the blockchain
129129
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Encode, Decode, TypeInfo)]
130130
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
131131
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
@@ -134,10 +134,13 @@ pub enum GlobalObject {
134134
#[codec(index = 0)]
135135
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
136136
V0 {
137-
/// Piece index where object is contained (at least its beginning, might not fit fully)
137+
/// Piece index where object is contained (at least its beginning, might not fit fully).
138+
/// The index and offset must be first, so that the Ord derives are valid.
138139
piece_index: PieceIndex,
139-
/// Offset of the object
140+
/// Offset of the object in that piece
140141
offset: u32,
142+
/// Object hash
143+
hash: Blake3Hash,
141144
},
142145
}
143146

@@ -155,4 +158,21 @@ impl GlobalObject {
155158
Self::V0 { offset, .. } => *offset,
156159
}
157160
}
161+
162+
/// Object hash
163+
pub fn hash(&self) -> Blake3Hash {
164+
match self {
165+
Self::V0 { hash, .. } => *hash,
166+
}
167+
}
168+
}
169+
170+
/// Mapping of objects stored in the history of the blockchain
171+
#[derive(Debug, Default, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Encode, Decode, TypeInfo)]
172+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
173+
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
174+
pub struct GlobalObjectMapping {
175+
/// Objects stored in the history of the blockchain.
176+
/// Mappings are ordered by the piece index and offset of the first GlobalObject in objects.
177+
pub objects: Vec<GlobalObject>,
158178
}

0 commit comments

Comments
 (0)