From a91931e1d58b5df2361324ec618c98727aca659e Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Sun, 30 Nov 2025 17:59:30 +0100 Subject: [PATCH 1/3] feat(iota-indexer): add HistoricalFallbackReader as a high level client to retrieve data for indexer JSON RPC --- crates/iota-indexer/src/errors.rs | 8 +- .../src/historical_fallback/client.rs | 7 +- .../src/historical_fallback/convert.rs | 10 +- .../src/historical_fallback/mod.rs | 1 + .../src/historical_fallback/reader.rs | 523 ++++++++++++++++++ 5 files changed, 536 insertions(+), 13 deletions(-) create mode 100644 crates/iota-indexer/src/historical_fallback/reader.rs diff --git a/crates/iota-indexer/src/errors.rs b/crates/iota-indexer/src/errors.rs index 8b6f9edd4c7..c70db175eca 100644 --- a/crates/iota-indexer/src/errors.rs +++ b/crates/iota-indexer/src/errors.rs @@ -7,7 +7,7 @@ use iota_data_ingestion_core::IngestionError; use iota_json_rpc_api::{error_object_from_rpc, internal_error}; use iota_names::error::IotaNamesError; use iota_types::{ - base_types::ObjectIDParseError, + base_types::{ObjectID, ObjectIDParseError, SequenceNumber}, error::{IotaError, IotaObjectResponseError, UserInputError}, }; use jsonrpsee::{core::ClientError as RpcError, types::ErrorObjectOwned}; @@ -153,6 +153,12 @@ pub enum IndexerError { #[error("Transaction dependencies have not been indexed")] TransactionDependenciesNotIndexed, + + #[error("historical fallback object not found: id {object_id}, version {version}")] + HistoricalFallbackObjectNotFound { + object_id: ObjectID, + version: SequenceNumber, + }, } pub type IndexerResult = Result; diff --git a/crates/iota-indexer/src/historical_fallback/client.rs b/crates/iota-indexer/src/historical_fallback/client.rs index ae1ec57fffc..38d85855733 100644 --- a/crates/iota-indexer/src/historical_fallback/client.rs +++ b/crates/iota-indexer/src/historical_fallback/client.rs @@ -1,6 +1,8 @@ // Copyright (c) 2025 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 +//! Module containing the client for interacting with the REST API KV server. + use std::str::FromStr; use bytes::Bytes; @@ -26,7 +28,6 @@ use tracing::{error, info, instrument, trace, warn}; use crate::errors::IndexerResult; -#[expect(dead_code)] pub(crate) trait KeyValueStoreClient { async fn multi_get_transactions( &self, @@ -75,7 +76,6 @@ pub(crate) struct HttpRestKVClient { } impl HttpRestKVClient { - #[expect(dead_code)] pub fn new(base_url: &str) -> IndexerResult { info!("creating HttpRestKVClient with base_url: {}", base_url); @@ -98,7 +98,6 @@ impl HttpRestKVClient { Ok(Url::from_str(joined.as_str())?) } - #[expect(dead_code)] async fn multi_fetch(&self, uris: Vec) -> Vec>> { let len = uris.len(); let fetches = stream::iter(uris.into_iter().map(|url| self.fetch(url))); @@ -138,7 +137,6 @@ where .ok() } -#[expect(dead_code)] fn map_fetch<'a, K>(fetch: (&'a IndexerResult>, &'a K)) -> Option<(&'a Bytes, &'a K)> where K: std::fmt::Debug, @@ -154,7 +152,6 @@ where } } -#[expect(dead_code)] fn deser_check_digest( digest: &D, bytes: &Bytes, diff --git a/crates/iota-indexer/src/historical_fallback/convert.rs b/crates/iota-indexer/src/historical_fallback/convert.rs index 003fdae7ecf..d3aac9461d3 100644 --- a/crates/iota-indexer/src/historical_fallback/convert.rs +++ b/crates/iota-indexer/src/historical_fallback/convert.rs @@ -37,13 +37,13 @@ use crate::{ /// Alias for an [`Object`] fetched from historical fallback storage. /// /// Contains all data needed to reconstruct a [`StoredObject`]. -type HistoricalFallbackObject = Object; +pub(crate) type HistoricalFallbackObject = Object; /// Alias for [`CertifiedCheckpointSummary`] with its [`CheckpointContents`] /// data fetched from historical fallback storage. /// /// Contains all data needed to reconstruct a [`StoredCheckpoint`]. -type HistoricalFallbackCheckpoint = (CertifiedCheckpointSummary, CheckpointContents); +pub(crate) type HistoricalFallbackCheckpoint = (CertifiedCheckpointSummary, CheckpointContents); impl From for StoredObject { fn from(object: HistoricalFallbackObject) -> Self { @@ -81,7 +81,6 @@ pub struct HistoricalFallbackEvents { } impl HistoricalFallbackEvents { - #[expect(dead_code)] pub fn new(events: TransactionEvents, checkpoint_summary: CertifiedCheckpointSummary) -> Self { Self { events, @@ -91,7 +90,6 @@ impl HistoricalFallbackEvents { /// Converts the raw [`TransactionEvents`] into JSON RPC compatible /// [`IotaEvent`]s. - #[expect(dead_code)] pub(crate) async fn into_iota_events( self, package_resolver: Arc>, @@ -122,7 +120,6 @@ pub struct HistoricalFallbackTransaction { } impl HistoricalFallbackTransaction { - #[expect(dead_code)] pub fn new( checkpoint_transaction: CheckpointTransaction, checkpoint_summary: CertifiedCheckpointSummary, @@ -136,8 +133,7 @@ impl HistoricalFallbackTransaction { /// Converts the historical fallback transaction into a /// [`StoredTransaction`]. - #[expect(dead_code)] - async fn into_stored_transaction(self) -> IndexerResult { + pub(crate) async fn into_stored_transaction(self) -> IndexerResult { // StoredTransaction::try_into_iota_transaction_block_response implementation // does not use the `tx_sequence_number`, in this regard it is safe to // hardcode to 0. diff --git a/crates/iota-indexer/src/historical_fallback/mod.rs b/crates/iota-indexer/src/historical_fallback/mod.rs index c6d9f86af69..7d80b359e87 100644 --- a/crates/iota-indexer/src/historical_fallback/mod.rs +++ b/crates/iota-indexer/src/historical_fallback/mod.rs @@ -10,3 +10,4 @@ pub(crate) mod client; pub(crate) mod convert; +pub(crate) mod reader; diff --git a/crates/iota-indexer/src/historical_fallback/reader.rs b/crates/iota-indexer/src/historical_fallback/reader.rs new file mode 100644 index 00000000000..f6bdcc7b46b --- /dev/null +++ b/crates/iota-indexer/src/historical_fallback/reader.rs @@ -0,0 +1,523 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +//! Module containing the historical fallback reader implementation. +//! +//! This module provides a high-level client to interact with the historical +//! fallback storage. It enables integration with the +//! [`IndexerReader`](crate::read::IndexerReader) for fallback capabilities when +//! the indexer is unable to fetch data from the database, which is especially +//! useful when pruning is enabled. + +#![expect(dead_code)] + +use std::collections::HashMap; + +use futures::future; +use iota_json_rpc_types::{CheckpointId, IotaEvent}; +use iota_rest_api::CheckpointTransaction; +use iota_types::{ + base_types::{ObjectID, SequenceNumber}, + digests::TransactionDigest, + effects::{TransactionEffects, TransactionEffectsAPI}, + event::EventID, + messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSequenceNumber}, + object::Object, +}; +use itertools::{Itertools, izip}; + +use crate::{ + errors::{IndexerError, IndexerResult}, + historical_fallback::{ + client::{HttpRestKVClient, KeyValueStoreClient}, + convert::{HistoricalFallbackEvents, HistoricalFallbackTransaction}, + }, + models::{ + checkpoints::StoredCheckpoint, objects::StoredObject, transactions::StoredTransaction, + }, + read::PackageResolver, +}; + +/// Represents the Input objects of a transaction. +pub type InputObjects = Vec; +/// Represents the Output objects of a transaction. +pub type OutputObjects = Vec; + +/// A high-level client to interact with the historical fallback storage. +/// +/// Provides convenient methods to fetch data from the historical fallback +/// storage, with automatic conversions into types the Indexer uses when +/// fetching from the database. This enables integration with the +/// [`IndexerReader`](crate::read::IndexerReader) for fallback capabilities +/// when the indexer is unable to fetch data from the database, which is +/// especially useful when pruning is enabled. +pub(crate) struct HistoricalFallbackReader { + /// Client responsible for fetching data from the historical fallback + /// storage through REST API interface. + client: HttpRestKVClient, + // TODO: The package resolver should support fetching an object by ID from fallback storage + // in case the objects table is pruned in the indexer. + // + // Current approach: We use the package resolver to fetch packages by ID from Postgres. + // + // Future requirements: Once the objects table is pruned, the package resolver will need + // to fetch objects by ID from the historical fallback reader. This depends on the range + // scan feature being implemented on the KV REST API side. + package_resolver: PackageResolver, +} + +impl HistoricalFallbackReader { + pub fn new(rest_kv_url: &str, package_resolver: PackageResolver) -> IndexerResult { + let client = HttpRestKVClient::new(rest_kv_url)?; + Ok(Self { + client, + package_resolver, + }) + } + + /// Resolves the input and output objects from a given transaction effects. + pub(crate) async fn resolve_transaction_input_output_objects( + &self, + transaction_effects: &TransactionEffects, + ) -> IndexerResult<(InputObjects, OutputObjects)> { + let input_object_keys = transaction_effects.modified_at_versions(); + + let output_object_keys = transaction_effects + .all_changed_objects() + .into_iter() + .map(|((object_id, version, _object_digest), _owner, _kind)| (object_id, version)) + .collect::>(); + + let (raw_input_objects, raw_output_objects) = tokio::try_join!( + self.client.multi_get_objects(&input_object_keys), + self.client.multi_get_objects(&output_object_keys), + )?; + + let input_objects = raw_input_objects + .into_iter() + .zip(&input_object_keys) + .map(|(object, (object_id, version))| { + object.ok_or_else(|| IndexerError::HistoricalFallbackObjectNotFound { + object_id: *object_id, + version: *version, + }) + }) + .collect::>>()?; + + let output_objects = raw_output_objects + .into_iter() + .zip(&output_object_keys) + .map(|(object, (object_id, version))| { + object.ok_or_else(|| IndexerError::HistoricalFallbackObjectNotFound { + object_id: *object_id, + version: *version, + }) + }) + .collect::>>()?; + + Ok((input_objects, output_objects)) + } + + /// Resolves the checkpoint summaries by the provided transaction digests + pub(crate) async fn resolve_checkpoint_summaries_by_tx_digests( + &self, + tx_digests: &[TransactionDigest], + ) -> IndexerResult> { + let tx_to_checkpoint_seq_num = self + .client + .multi_get_transactions_perpetual_checkpoints(tx_digests) + .await?; + + // deduplicate checkpoint sequence numbers to avoid fetching the same summary + // multiple times. + let unique_seq_nums = tx_to_checkpoint_seq_num + .iter() + .flatten() + .unique() + .copied() + .collect::>(); + + let summaries_by_seq = self + .client + .multi_get_checkpoints_summaries_by_sequence_numbers(&unique_seq_nums) + .await? + .into_iter() + .flatten() + .map(|summary| (summary.sequence_number, summary)) + .collect::>(); + + // map each tx digest to its checkpoint summary + let summaries = tx_digests + .iter() + .zip(tx_to_checkpoint_seq_num) + .filter_map(|(digest, seq_num)| { + let seq = seq_num?; + summaries_by_seq + .get(&seq) + .cloned() + .map(|summary| (*digest, summary)) + }) + .collect(); + + Ok(summaries) + } + + /// Fetches a checkpoint by either a [`CheckpointSequenceNumber`] or + /// [`CheckpointDigest`](iota_types::digests::CheckpointDigest). + pub(crate) async fn checkpoint( + &self, + id: CheckpointId, + ) -> IndexerResult> { + let (summaries, contents) = match id { + CheckpointId::SequenceNumber(sequence_number) => { + let seq_nums = [sequence_number]; + tokio::try_join!( + self.client + .multi_get_checkpoints_summaries_by_sequence_numbers(&seq_nums), + self.client.multi_get_checkpoints_contents(&seq_nums) + )? + } + CheckpointId::Digest(digest) => { + let summaries = self + .client + .multi_get_checkpoints_summaries_by_digests(&[digest]) + .await?; + + let Some(seq_num) = summaries + .first() + .and_then(|summary| summary.as_ref()) + .map(|summary| summary.sequence_number) + else { + return Ok(None); + }; + + let contents = self + .client + .multi_get_checkpoints_contents(&[seq_num]) + .await?; + + (summaries, contents) + } + }; + + let checkpoint = summaries + .into_iter() + .zip(contents) + .next() + .and_then(|(s, c)| Some(StoredCheckpoint::from((s?, c?)))); + + Ok(checkpoint) + } + + /// Fetches multiple checkpoints from the historical fallback storage. + /// + /// Returns checkpoints in paginated form, supporting both ascending and + /// descending order. + /// + /// # Pagination Behavior + /// + /// | cursor | descending | Result | + /// |--------|------------|--------| + /// | `None` | `false` | Starts from checkpoint 0 | + /// | `None` | `true` | Starts from latest checkpoint | + /// | `Some(n)` | `false` | Starts from checkpoint n+1 | + /// | `Some(n)` | `true` | Starts from checkpoint n-1 | + /// + /// # NOTE + /// `StoredCheckpoint.sequence_number` is hardcoded to 0, due to missing + /// data in the historical fallback. It can be derived but the operations + /// could be expensive. Can be added in future iterations. + pub(crate) async fn checkpoints( + &self, + // TODO: The `latest_checkpoint` parameter is a temporary workaround. The KV store + // doesn't currently have a way to determine the latest available checkpoint. + // This can be removed once either: + // - Range scan is implemented on the KV REST API, or + // - The `get_latest_checkpoint()` method is re-added (removed during upstream pull) + latest_available_checkpoint: CheckpointSequenceNumber, + cursor: Option, + limit: usize, + descending_order: bool, + ) -> IndexerResult>> { + if limit == 0 { + return Ok(vec![]); + } + + let seq_nums: Vec = match (cursor, descending_order) { + // descending with cursor: start from cursor - 1, go down `limit` items. + (Some(cursor), true) => { + let end = cursor.saturating_sub(limit as u64); + (end..cursor).rev().collect() + } + // ascending with cursor: start from cursor + 1, go up `limit` items. + (Some(cursor), false) => { + let start = cursor + 1; + let end = (start + limit as u64).min(latest_available_checkpoint + 1); + (start..end).collect() + } + // descending without cursor: start from latest, go down `limit` items. + (None, true) => { + let start = latest_available_checkpoint.saturating_sub(limit as u64 - 1); + (start..=latest_available_checkpoint).rev().collect() + } + // ascending without cursor: start from 0, go up `limit` items. + (None, false) => { + let end = (latest_available_checkpoint + 1).min(limit as u64); + (0..end).collect() + } + }; + + let (summaries, contents) = tokio::try_join!( + self.client + .multi_get_checkpoints_summaries_by_sequence_numbers(&seq_nums), + self.client.multi_get_checkpoints_contents(&seq_nums) + )?; + + let checkpoints = summaries + .into_iter() + .zip(contents) + .map(|(s, c)| Some(StoredCheckpoint::from((s?, c?)))) + .collect(); + + Ok(checkpoints) + } + + /// Fetches events belonging to the provided transaction digest. + pub(crate) async fn events( + &self, + tx_digest: TransactionDigest, + ) -> IndexerResult> { + let tx_digests = &[tx_digest]; + let (events, checkpoint_summaries) = tokio::try_join!( + self.client.multi_get_events_by_tx_digests(tx_digests), + self.resolve_checkpoint_summaries_by_tx_digests(tx_digests) + )?; + + let Some(historical_event) = events.into_iter().next().and_then(|events| { + let summary = checkpoint_summaries.get(&tx_digest).cloned()?; + Some(HistoricalFallbackEvents::new(events?, summary)) + }) else { + return Ok(vec![]); + }; + + historical_event + .into_iota_events(self.package_resolver.clone(), tx_digest) + .await + } + + /// Fetches transactions from the provided transaction digests. + /// + /// # NOTE + /// The `StoredTransaction.tx_sequence_number` is hardcoded to 0, due to + /// missing data in the historical fallback. It can be derived but the + /// operations could be expensive. Can be added in future iterations. + pub(crate) async fn transactions( + &self, + tx_digests: &[TransactionDigest], + ) -> IndexerResult>> { + let (transactions, effects, events, checkpoint_summaries) = tokio::try_join!( + self.client.multi_get_transactions(tx_digests), + self.client.multi_get_effects(tx_digests), + self.client.multi_get_events_by_tx_digests(tx_digests), + self.resolve_checkpoint_summaries_by_tx_digests(tx_digests) + )?; + + let futures = + izip!(transactions, effects, events).map(|(transaction, effects, events)| async { + let (Some(transaction), Some(effects)) = (transaction, effects) else { + return Ok(None); + }; + + let Some(summary) = checkpoint_summaries.get(transaction.digest()).cloned() else { + return Ok(None); + }; + + let (input_objects, output_objects) = self + .resolve_transaction_input_output_objects(&effects) + .await?; + + let checkpoint_transaction = CheckpointTransaction { + transaction, + effects, + events, + input_objects, + output_objects, + }; + + HistoricalFallbackTransaction::new(checkpoint_transaction, summary) + .into_stored_transaction() + .await + .map(Some) + }); + + future::try_join_all(futures).await + } + + /// Fetches objects by their ID and version from historical fallback + /// storage. + /// + /// - If `before_version` is `false`, it looks for the exact version. + /// - If `true`, it finds the latest version before the given one. + /// + /// # Note + /// + /// Currently only supports `before_version = false`. + /// + /// Support for `before_version = true` will be added once range scan is + /// implemented on the KV REST API. + pub(crate) async fn objects( + &self, + object_refs: &[(ObjectID, SequenceNumber)], + before_version: bool, + ) -> IndexerResult>> { + if before_version { + // TODO: Implement once range scan is available on KV REST API + // For now, we cannot determine the correct previous version without it due to + // non-contiguous object versioning: + // https://docs.iota.org/developer/iota-101/objects/versioning#move-objects. + return Ok(vec![None; object_refs.len()]); + } + + let stored_objects = self + .client + .multi_get_objects(object_refs) + .await? + .into_iter() + .map(|obj| obj.map(StoredObject::from)) + .collect(); + + Ok(stored_objects) + } + + /// Fetches transactions belonging to a specific checkpoint. + /// + /// Returns transactions in paginated form, supporting both ascending and + /// descending order within the checkpoint. + /// + /// # Pagination Behavior + /// + /// | cursor | descending | Result | + /// |--------|------------|--------| + /// | `None` | `false` | Starts from first transaction in checkpoint | + /// | `None` | `true` | Starts from last transaction in checkpoint | + /// | `Some(tx)` | `false` | Starts after `tx`, ascending | + /// | `Some(tx)` | `true` | Starts after `tx`, descending | + pub(crate) async fn checkpoint_transactions( + &self, + cursor: Option, + checkpoint_sequence_number: CheckpointSequenceNumber, + limit: usize, + is_descending: bool, + ) -> IndexerResult>> { + if limit == 0 { + return Ok(vec![]); + } + + let Some(contents) = self + .client + .multi_get_checkpoints_contents(&[checkpoint_sequence_number]) + .await? + .into_iter() + .next() + .flatten() + else { + return Ok(vec![]); + }; + + let tx_digests: Vec = contents.iter().map(|b| b.transaction).collect(); + + // apply ordering + let tx_digests: Vec = if is_descending { + tx_digests.into_iter().rev().collect() + } else { + tx_digests + }; + + // apply cursor: skip transactions until after the cursor. + // + // This relies on transactions being ordered within checkpoint contents, + // so we can skip until we find the cursor, then skip the cursor itself. + let tx_digests: Vec = if let Some(cursor) = cursor { + tx_digests + .into_iter() + .skip_while(|digest| *digest != cursor) + .skip(1) // skip the cursor itself + .collect() + } else { + tx_digests + }; + + // apply limit + let tx_digests = tx_digests + .into_iter() + .take(limit) + .collect::>(); + + self.transactions(&tx_digests).await + } + + /// Fetches events for a specific transaction. + /// + /// Returns events emitted by the specified transaction, with support for + /// cursor-based pagination and ordering. + /// + /// # Pagination Behavior + /// + /// Events are indexed by their position in the transaction (event_seq = 0, + /// 1, 2, ...). + /// + /// | cursor | descending | Result | + /// |--------|------------|--------| + /// | `None` | `false` | Starts from event_seq 0 | + /// | `None` | `true` | Starts from last event | + /// | `Some(seq)` | `false` | Starts after event_seq | + /// | `Some(seq)` | `true` | Starts before event_seq | + pub(crate) async fn transaction_events( + &self, + tx_digest: TransactionDigest, + cursor: Option, + limit: usize, + descending_order: bool, + ) -> IndexerResult> { + if limit == 0 { + return Ok(vec![]); + } + + // validate cursor if provided + let start_seq = if let Some(cursor) = cursor { + if cursor.tx_digest != tx_digest { + return Err(IndexerError::InvalidArgument(format!( + "Cursor tx_digest {} does not match requested tx_digest {}", + cursor.tx_digest, tx_digest + ))); + } + Some(cursor.event_seq) + } else { + None + }; + + let events = self.events(tx_digest).await?; + + // apply ordering, cursor, and limit + let events = if descending_order { + events + .into_iter() + .enumerate() + .rev() // reverse for descending + .filter(|(idx, _)| start_seq.is_none_or(|seq| (*idx as u64) < seq)) + .take(limit) + .map(|(_, event)| event) + .collect() + } else { + events + .into_iter() + .enumerate() + .filter(|(idx, _)| start_seq.is_none_or(|seq| (*idx as u64) > seq)) + .take(limit) + .map(|(_, event)| event) + .collect() + }; + + Ok(events) + } +} From 4154485f7b251f64ed96bbfcfcafd89e325c4d30 Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Tue, 2 Dec 2025 11:00:27 +0100 Subject: [PATCH 2/3] fixup! feat(iota-indexer): add HistoricalFallbackReader as a high level client to retrieve data for indexer JSON RPC --- crates/iota-indexer/src/errors.rs | 2 + .../src/historical_fallback/convert.rs | 39 ++++--- .../src/historical_fallback/reader.rs | 105 +++++++++--------- 3 files changed, 76 insertions(+), 70 deletions(-) diff --git a/crates/iota-indexer/src/errors.rs b/crates/iota-indexer/src/errors.rs index c70db175eca..4b4aeb67393 100644 --- a/crates/iota-indexer/src/errors.rs +++ b/crates/iota-indexer/src/errors.rs @@ -159,6 +159,8 @@ pub enum IndexerError { object_id: ObjectID, version: SequenceNumber, }, + #[error("historical fallback storage error: {0}")] + HistoricalFallbackStorageError(String), } pub type IndexerResult = Result; diff --git a/crates/iota-indexer/src/historical_fallback/convert.rs b/crates/iota-indexer/src/historical_fallback/convert.rs index d3aac9461d3..19f21e204ed 100644 --- a/crates/iota-indexer/src/historical_fallback/convert.rs +++ b/crates/iota-indexer/src/historical_fallback/convert.rs @@ -15,15 +15,13 @@ use iota_rest_api::CheckpointTransaction; use iota_types::{ digests::TransactionDigest, effects::TransactionEvents, - messages_checkpoint::{ - CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, - }, + messages_checkpoint::{CertifiedCheckpointSummary, CheckpointContents}, object::Object, }; use prometheus::Registry; use crate::{ - errors::IndexerResult, + errors::{IndexerError, IndexerResult}, ingestion::{common::prepare::extract_df_kind, primary::prepare::PrimaryWorker}, metrics::IndexerMetrics, models::{ @@ -114,40 +112,41 @@ pub struct HistoricalFallbackTransaction { /// Checkpointed transaction data. checkpoint_transaction: CheckpointTransaction, /// Checkpoint sequence number the transaction is part of. - checkpoint_sequence_number: CheckpointSequenceNumber, - /// Checkpoint timestamp. - timestamp: u64, + historical_checkpoint: HistoricalFallbackCheckpoint, } impl HistoricalFallbackTransaction { pub fn new( checkpoint_transaction: CheckpointTransaction, - checkpoint_summary: CertifiedCheckpointSummary, + historical_checkpoint: HistoricalFallbackCheckpoint, ) -> Self { Self { checkpoint_transaction, - checkpoint_sequence_number: checkpoint_summary.sequence_number, - timestamp: checkpoint_summary.timestamp_ms, + historical_checkpoint, } } /// Converts the historical fallback transaction into a /// [`StoredTransaction`]. pub(crate) async fn into_stored_transaction(self) -> IndexerResult { - // StoredTransaction::try_into_iota_transaction_block_response implementation - // does not use the `tx_sequence_number`, in this regard it is safe to - // hardcode to 0. - // - // If in future iterations, the `tx_sequence_number` will be needed, by - // importing the CheckpointContents we'll be able to derive it by using the - // CheckpointContents::enumerate_transactions method. - let tx_sequence_number = 0; + let tx_digest = self.checkpoint_transaction.transaction.digest(); + let (summary, contents) = self.historical_checkpoint; + + let Some(tx_sequence_number) = contents + .enumerate_transactions(&summary) + .find(|(_seq, execution_digest)| &execution_digest.transaction == tx_digest) + .map(|(seq, _execution_digest)| seq) + else { + return Err(IndexerError::HistoricalFallbackStorageError(format!( + "cannot find transaction sequence number to transaction: {tx_digest}" + ))); + }; let indexed_tx = PrimaryWorker::index_transaction( &self.checkpoint_transaction, tx_sequence_number, - self.checkpoint_sequence_number, - self.timestamp, + summary.sequence_number, + summary.timestamp_ms, &IndexerMetrics::new(&Registry::new()), ) .await?; diff --git a/crates/iota-indexer/src/historical_fallback/reader.rs b/crates/iota-indexer/src/historical_fallback/reader.rs index f6bdcc7b46b..3a8a595ac5f 100644 --- a/crates/iota-indexer/src/historical_fallback/reader.rs +++ b/crates/iota-indexer/src/historical_fallback/reader.rs @@ -21,16 +21,20 @@ use iota_types::{ digests::TransactionDigest, effects::{TransactionEffects, TransactionEffectsAPI}, event::EventID, - messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSequenceNumber}, + messages_checkpoint::{ + CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, + }, object::Object, }; -use itertools::{Itertools, izip}; +use itertools::{Either, Itertools, izip}; use crate::{ errors::{IndexerError, IndexerResult}, historical_fallback::{ client::{HttpRestKVClient, KeyValueStoreClient}, - convert::{HistoricalFallbackEvents, HistoricalFallbackTransaction}, + convert::{ + HistoricalFallbackCheckpoint, HistoricalFallbackEvents, HistoricalFallbackTransaction, + }, }, models::{ checkpoints::StoredCheckpoint, objects::StoredObject, transactions::StoredTransaction, @@ -55,14 +59,6 @@ pub(crate) struct HistoricalFallbackReader { /// Client responsible for fetching data from the historical fallback /// storage through REST API interface. client: HttpRestKVClient, - // TODO: The package resolver should support fetching an object by ID from fallback storage - // in case the objects table is pruned in the indexer. - // - // Current approach: We use the package resolver to fetch packages by ID from Postgres. - // - // Future requirements: Once the objects table is pruned, the package resolver will need - // to fetch objects by ID from the historical fallback reader. This depends on the range - // scan feature being implemented on the KV REST API side. package_resolver: PackageResolver, } @@ -118,11 +114,12 @@ impl HistoricalFallbackReader { Ok((input_objects, output_objects)) } - /// Resolves the checkpoint summaries by the provided transaction digests - pub(crate) async fn resolve_checkpoint_summaries_by_tx_digests( + /// Resolves the checkpoint summaries and contents by the provided + /// transaction digests + pub(crate) async fn resolve_checkpoints( &self, tx_digests: &[TransactionDigest], - ) -> IndexerResult> { + ) -> IndexerResult> { let tx_to_checkpoint_seq_num = self .client .multi_get_transactions_perpetual_checkpoints(tx_digests) @@ -137,14 +134,19 @@ impl HistoricalFallbackReader { .copied() .collect::>(); - let summaries_by_seq = self - .client - .multi_get_checkpoints_summaries_by_sequence_numbers(&unique_seq_nums) - .await? + let (summaries, contents) = tokio::try_join!( + self.client + .multi_get_checkpoints_summaries_by_sequence_numbers(&unique_seq_nums), + self.client.multi_get_checkpoints_contents(&unique_seq_nums) + )?; + + let checkpoints_map = summaries .into_iter() - .flatten() - .map(|summary| (summary.sequence_number, summary)) - .collect::>(); + .zip(contents.into_iter()) + .filter_map(|(summary, contents)| { + summary.and_then(|summary| contents.map(|contents| (summary.sequence_number, (summary, contents)))) + }) + .collect::>(); // map each tx digest to its checkpoint summary let summaries = tx_digests @@ -152,10 +154,10 @@ impl HistoricalFallbackReader { .zip(tx_to_checkpoint_seq_num) .filter_map(|(digest, seq_num)| { let seq = seq_num?; - summaries_by_seq + checkpoints_map .get(&seq) .cloned() - .map(|summary| (*digest, summary)) + .map(|(summary, contents)| (*digest, (summary, contents))) }) .collect(); @@ -224,7 +226,7 @@ impl HistoricalFallbackReader { /// | `Some(n)` | `true` | Starts from checkpoint n-1 | /// /// # NOTE - /// `StoredCheckpoint.sequence_number` is hardcoded to 0, due to missing + /// `StoredCheckpoint.successful_tx_num` is hardcoded to 0, due to missing /// data in the historical fallback. It can be derived but the operations /// could be expensive. Can be added in future iterations. pub(crate) async fn checkpoints( @@ -290,36 +292,31 @@ impl HistoricalFallbackReader { let tx_digests = &[tx_digest]; let (events, checkpoint_summaries) = tokio::try_join!( self.client.multi_get_events_by_tx_digests(tx_digests), - self.resolve_checkpoint_summaries_by_tx_digests(tx_digests) + self.resolve_checkpoints(tx_digests) )?; - let Some(historical_event) = events.into_iter().next().and_then(|events| { - let summary = checkpoint_summaries.get(&tx_digest).cloned()?; + let Some(historical_events) = events.into_iter().next().and_then(|events| { + let (summary, _) = checkpoint_summaries.get(&tx_digest).cloned()?; Some(HistoricalFallbackEvents::new(events?, summary)) }) else { return Ok(vec![]); }; - historical_event + historical_events .into_iota_events(self.package_resolver.clone(), tx_digest) .await } /// Fetches transactions from the provided transaction digests. - /// - /// # NOTE - /// The `StoredTransaction.tx_sequence_number` is hardcoded to 0, due to - /// missing data in the historical fallback. It can be derived but the - /// operations could be expensive. Can be added in future iterations. pub(crate) async fn transactions( &self, tx_digests: &[TransactionDigest], ) -> IndexerResult>> { - let (transactions, effects, events, checkpoint_summaries) = tokio::try_join!( + let (transactions, effects, events, checkpoints) = tokio::try_join!( self.client.multi_get_transactions(tx_digests), self.client.multi_get_effects(tx_digests), self.client.multi_get_events_by_tx_digests(tx_digests), - self.resolve_checkpoint_summaries_by_tx_digests(tx_digests) + self.resolve_checkpoints(tx_digests), )?; let futures = @@ -328,9 +325,17 @@ impl HistoricalFallbackReader { return Ok(None); }; - let Some(summary) = checkpoint_summaries.get(transaction.digest()).cloned() else { - return Ok(None); - }; + let historical_checkpoint = checkpoints + .get(transaction.digest()) + .cloned() + // if transaction exists but summary is not found this indicates a bug in data + // consistency in the KV Store. + .ok_or_else(|| { + IndexerError::HistoricalFallbackStorageError(format!( + "checkpoint summary and contents linked to transaction: {} not found", + transaction.digest() + )) + })?; let (input_objects, output_objects) = self .resolve_transaction_input_output_objects(&effects) @@ -344,7 +349,7 @@ impl HistoricalFallbackReader { output_objects, }; - HistoricalFallbackTransaction::new(checkpoint_transaction, summary) + HistoricalFallbackTransaction::new(checkpoint_transaction, historical_checkpoint) .into_stored_transaction() .await .map(Some) @@ -424,27 +429,27 @@ impl HistoricalFallbackReader { return Ok(vec![]); }; - let tx_digests: Vec = contents.iter().map(|b| b.transaction).collect(); + let tx_digests = contents.iter().map(|b| b.transaction); // apply ordering - let tx_digests: Vec = if is_descending { - tx_digests.into_iter().rev().collect() + let tx_digests = if is_descending { + Either::Left(tx_digests.rev()) } else { - tx_digests + Either::Right(tx_digests) }; // apply cursor: skip transactions until after the cursor. // // This relies on transactions being ordered within checkpoint contents, // so we can skip until we find the cursor, then skip the cursor itself. - let tx_digests: Vec = if let Some(cursor) = cursor { - tx_digests - .into_iter() - .skip_while(|digest| *digest != cursor) - .skip(1) // skip the cursor itself - .collect() + let tx_digests = if let Some(cursor) = cursor { + Either::Left( + tx_digests + .skip_while(move |digest| *digest != cursor) + .skip(1), // skip the cursor itself + ) } else { - tx_digests + Either::Right(tx_digests) }; // apply limit From bc8baeaad6a05addb1651126b75b15e5cc540b08 Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Wed, 3 Dec 2025 12:22:27 +0100 Subject: [PATCH 3/3] fixup! fixup! feat(iota-indexer): add HistoricalFallbackReader as a high level client to retrieve data for indexer JSON RPC --- .../src/historical_fallback/reader.rs | 89 ++++++++----------- 1 file changed, 36 insertions(+), 53 deletions(-) diff --git a/crates/iota-indexer/src/historical_fallback/reader.rs b/crates/iota-indexer/src/historical_fallback/reader.rs index 3a8a595ac5f..bda1fdc47b4 100644 --- a/crates/iota-indexer/src/historical_fallback/reader.rs +++ b/crates/iota-indexer/src/historical_fallback/reader.rs @@ -218,12 +218,10 @@ impl HistoricalFallbackReader { /// /// # Pagination Behavior /// - /// | cursor | descending | Result | - /// |--------|------------|--------| - /// | `None` | `false` | Starts from checkpoint 0 | - /// | `None` | `true` | Starts from latest checkpoint | - /// | `Some(n)` | `false` | Starts from checkpoint n+1 | - /// | `Some(n)` | `true` | Starts from checkpoint n-1 | + /// | cursor | descending | Result | + /// |--------|------------|-----------------------------| + /// | `n` | `false` | Starts from checkpoint n+1 | + /// | `n` | `true` | Starts from checkpoint n-1 | /// /// # NOTE /// `StoredCheckpoint.successful_tx_num` is hardcoded to 0, due to missing @@ -231,13 +229,7 @@ impl HistoricalFallbackReader { /// could be expensive. Can be added in future iterations. pub(crate) async fn checkpoints( &self, - // TODO: The `latest_checkpoint` parameter is a temporary workaround. The KV store - // doesn't currently have a way to determine the latest available checkpoint. - // This can be removed once either: - // - Range scan is implemented on the KV REST API, or - // - The `get_latest_checkpoint()` method is re-added (removed during upstream pull) - latest_available_checkpoint: CheckpointSequenceNumber, - cursor: Option, + cursor: CheckpointSequenceNumber, limit: usize, descending_order: bool, ) -> IndexerResult>> { @@ -245,30 +237,21 @@ impl HistoricalFallbackReader { return Ok(vec![]); } - let seq_nums: Vec = match (cursor, descending_order) { - // descending with cursor: start from cursor - 1, go down `limit` items. - (Some(cursor), true) => { - let end = cursor.saturating_sub(limit as u64); - (end..cursor).rev().collect() - } - // ascending with cursor: start from cursor + 1, go up `limit` items. - (Some(cursor), false) => { - let start = cursor + 1; - let end = (start + limit as u64).min(latest_available_checkpoint + 1); - (start..end).collect() - } - // descending without cursor: start from latest, go down `limit` items. - (None, true) => { - let start = latest_available_checkpoint.saturating_sub(limit as u64 - 1); - (start..=latest_available_checkpoint).rev().collect() - } - // ascending without cursor: start from 0, go up `limit` items. - (None, false) => { - let end = (latest_available_checkpoint + 1).min(limit as u64); - (0..end).collect() - } + let seq_nums: Vec = if descending_order { + // descending: start from cursor - 1, go down `limit` items + let end = cursor.saturating_sub(limit as u64); + (end..cursor).rev().collect() + } else { + // ascending: start from cursor + 1, go up `limit` items + let start = cursor + 1; + let end = start + limit as u64; + (start..end).collect() }; + if seq_nums.is_empty() { + return Ok(vec![]); + } + let (summaries, contents) = tokio::try_join!( self.client .multi_get_checkpoints_summaries_by_sequence_numbers(&seq_nums), @@ -284,8 +267,8 @@ impl HistoricalFallbackReader { Ok(checkpoints) } - /// Fetches events belonging to the provided transaction digest. - pub(crate) async fn events( + /// Fetches all events belonging to the provided transaction digest. + pub(crate) async fn all_events( &self, tx_digest: TransactionDigest, ) -> IndexerResult> { @@ -401,12 +384,12 @@ impl HistoricalFallbackReader { /// /// # Pagination Behavior /// - /// | cursor | descending | Result | - /// |--------|------------|--------| - /// | `None` | `false` | Starts from first transaction in checkpoint | - /// | `None` | `true` | Starts from last transaction in checkpoint | - /// | `Some(tx)` | `false` | Starts after `tx`, ascending | - /// | `Some(tx)` | `true` | Starts after `tx`, descending | + /// | cursor | descending | Result | + /// |------------|------------|---------------------------------------------| + /// | `None` | `false` | Starts from first transaction in checkpoint | + /// | `None` | `true` | Starts from last transaction in checkpoint | + /// | `Some(tx)` | `false` | Starts after `tx`, ascending | + /// | `Some(tx)` | `true` | Starts after `tx`, descending | pub(crate) async fn checkpoint_transactions( &self, cursor: Option, @@ -471,13 +454,13 @@ impl HistoricalFallbackReader { /// Events are indexed by their position in the transaction (event_seq = 0, /// 1, 2, ...). /// - /// | cursor | descending | Result | - /// |--------|------------|--------| - /// | `None` | `false` | Starts from event_seq 0 | - /// | `None` | `true` | Starts from last event | - /// | `Some(seq)` | `false` | Starts after event_seq | - /// | `Some(seq)` | `true` | Starts before event_seq | - pub(crate) async fn transaction_events( + /// | cursor | descending | Result | + /// |-------------|------------|--------------------------| + /// | `None` | `false` | Starts from event_seq 0 | + /// | `None` | `true` | Starts from last event | + /// | `Some(seq)` | `false` | Starts after event_seq | + /// | `Some(seq)` | `true` | Starts before event_seq | + pub(crate) async fn events( &self, tx_digest: TransactionDigest, cursor: Option, @@ -492,8 +475,8 @@ impl HistoricalFallbackReader { let start_seq = if let Some(cursor) = cursor { if cursor.tx_digest != tx_digest { return Err(IndexerError::InvalidArgument(format!( - "Cursor tx_digest {} does not match requested tx_digest {}", - cursor.tx_digest, tx_digest + "Cursor tx_digest {} does not match requested tx_digest {tx_digest}", + cursor.tx_digest ))); } Some(cursor.event_seq) @@ -501,7 +484,7 @@ impl HistoricalFallbackReader { None }; - let events = self.events(tx_digest).await?; + let events = self.all_events(tx_digest).await?; // apply ordering, cursor, and limit let events = if descending_order {