diff --git a/crates/iota-indexer/src/errors.rs b/crates/iota-indexer/src/errors.rs index 8b6f9edd4c7..4b4aeb67393 100644 --- a/crates/iota-indexer/src/errors.rs +++ b/crates/iota-indexer/src/errors.rs @@ -7,7 +7,7 @@ use iota_data_ingestion_core::IngestionError; use iota_json_rpc_api::{error_object_from_rpc, internal_error}; use iota_names::error::IotaNamesError; use iota_types::{ - base_types::ObjectIDParseError, + base_types::{ObjectID, ObjectIDParseError, SequenceNumber}, error::{IotaError, IotaObjectResponseError, UserInputError}, }; use jsonrpsee::{core::ClientError as RpcError, types::ErrorObjectOwned}; @@ -153,6 +153,14 @@ pub enum IndexerError { #[error("Transaction dependencies have not been indexed")] TransactionDependenciesNotIndexed, + + #[error("historical fallback object not found: id {object_id}, version {version}")] + HistoricalFallbackObjectNotFound { + object_id: ObjectID, + version: SequenceNumber, + }, + #[error("historical fallback storage error: {0}")] + HistoricalFallbackStorageError(String), } pub type IndexerResult = Result; diff --git a/crates/iota-indexer/src/historical_fallback/client.rs b/crates/iota-indexer/src/historical_fallback/client.rs index ae1ec57fffc..38d85855733 100644 --- a/crates/iota-indexer/src/historical_fallback/client.rs +++ b/crates/iota-indexer/src/historical_fallback/client.rs @@ -1,6 +1,8 @@ // Copyright (c) 2025 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 +//! Module containing the client for interacting with the REST API KV server. + use std::str::FromStr; use bytes::Bytes; @@ -26,7 +28,6 @@ use tracing::{error, info, instrument, trace, warn}; use crate::errors::IndexerResult; -#[expect(dead_code)] pub(crate) trait KeyValueStoreClient { async fn multi_get_transactions( &self, @@ -75,7 +76,6 @@ pub(crate) struct HttpRestKVClient { } impl HttpRestKVClient { - #[expect(dead_code)] pub fn new(base_url: &str) -> IndexerResult { info!("creating HttpRestKVClient with base_url: {}", base_url); @@ -98,7 +98,6 @@ impl HttpRestKVClient { Ok(Url::from_str(joined.as_str())?) } - #[expect(dead_code)] async fn multi_fetch(&self, uris: Vec) -> Vec>> { let len = uris.len(); let fetches = stream::iter(uris.into_iter().map(|url| self.fetch(url))); @@ -138,7 +137,6 @@ where .ok() } -#[expect(dead_code)] fn map_fetch<'a, K>(fetch: (&'a IndexerResult>, &'a K)) -> Option<(&'a Bytes, &'a K)> where K: std::fmt::Debug, @@ -154,7 +152,6 @@ where } } -#[expect(dead_code)] fn deser_check_digest( digest: &D, bytes: &Bytes, diff --git a/crates/iota-indexer/src/historical_fallback/convert.rs b/crates/iota-indexer/src/historical_fallback/convert.rs index 003fdae7ecf..19f21e204ed 100644 --- a/crates/iota-indexer/src/historical_fallback/convert.rs +++ b/crates/iota-indexer/src/historical_fallback/convert.rs @@ -15,15 +15,13 @@ use iota_rest_api::CheckpointTransaction; use iota_types::{ digests::TransactionDigest, effects::TransactionEvents, - messages_checkpoint::{ - CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, - }, + messages_checkpoint::{CertifiedCheckpointSummary, CheckpointContents}, object::Object, }; use prometheus::Registry; use crate::{ - errors::IndexerResult, + errors::{IndexerError, IndexerResult}, ingestion::{common::prepare::extract_df_kind, primary::prepare::PrimaryWorker}, metrics::IndexerMetrics, models::{ @@ -37,13 +35,13 @@ use crate::{ /// Alias for an [`Object`] fetched from historical fallback storage. /// /// Contains all data needed to reconstruct a [`StoredObject`]. -type HistoricalFallbackObject = Object; +pub(crate) type HistoricalFallbackObject = Object; /// Alias for [`CertifiedCheckpointSummary`] with its [`CheckpointContents`] /// data fetched from historical fallback storage. /// /// Contains all data needed to reconstruct a [`StoredCheckpoint`]. -type HistoricalFallbackCheckpoint = (CertifiedCheckpointSummary, CheckpointContents); +pub(crate) type HistoricalFallbackCheckpoint = (CertifiedCheckpointSummary, CheckpointContents); impl From for StoredObject { fn from(object: HistoricalFallbackObject) -> Self { @@ -81,7 +79,6 @@ pub struct HistoricalFallbackEvents { } impl HistoricalFallbackEvents { - #[expect(dead_code)] pub fn new(events: TransactionEvents, checkpoint_summary: CertifiedCheckpointSummary) -> Self { Self { events, @@ -91,7 +88,6 @@ impl HistoricalFallbackEvents { /// Converts the raw [`TransactionEvents`] into JSON RPC compatible /// [`IotaEvent`]s. - #[expect(dead_code)] pub(crate) async fn into_iota_events( self, package_resolver: Arc>, @@ -116,42 +112,41 @@ pub struct HistoricalFallbackTransaction { /// Checkpointed transaction data. checkpoint_transaction: CheckpointTransaction, /// Checkpoint sequence number the transaction is part of. - checkpoint_sequence_number: CheckpointSequenceNumber, - /// Checkpoint timestamp. - timestamp: u64, + historical_checkpoint: HistoricalFallbackCheckpoint, } impl HistoricalFallbackTransaction { - #[expect(dead_code)] pub fn new( checkpoint_transaction: CheckpointTransaction, - checkpoint_summary: CertifiedCheckpointSummary, + historical_checkpoint: HistoricalFallbackCheckpoint, ) -> Self { Self { checkpoint_transaction, - checkpoint_sequence_number: checkpoint_summary.sequence_number, - timestamp: checkpoint_summary.timestamp_ms, + historical_checkpoint, } } /// Converts the historical fallback transaction into a /// [`StoredTransaction`]. - #[expect(dead_code)] - async fn into_stored_transaction(self) -> IndexerResult { - // StoredTransaction::try_into_iota_transaction_block_response implementation - // does not use the `tx_sequence_number`, in this regard it is safe to - // hardcode to 0. - // - // If in future iterations, the `tx_sequence_number` will be needed, by - // importing the CheckpointContents we'll be able to derive it by using the - // CheckpointContents::enumerate_transactions method. - let tx_sequence_number = 0; + pub(crate) async fn into_stored_transaction(self) -> IndexerResult { + let tx_digest = self.checkpoint_transaction.transaction.digest(); + let (summary, contents) = self.historical_checkpoint; + + let Some(tx_sequence_number) = contents + .enumerate_transactions(&summary) + .find(|(_seq, execution_digest)| &execution_digest.transaction == tx_digest) + .map(|(seq, _execution_digest)| seq) + else { + return Err(IndexerError::HistoricalFallbackStorageError(format!( + "cannot find transaction sequence number to transaction: {tx_digest}" + ))); + }; let indexed_tx = PrimaryWorker::index_transaction( &self.checkpoint_transaction, tx_sequence_number, - self.checkpoint_sequence_number, - self.timestamp, + summary.sequence_number, + summary.timestamp_ms, &IndexerMetrics::new(&Registry::new()), ) .await?; diff --git a/crates/iota-indexer/src/historical_fallback/mod.rs b/crates/iota-indexer/src/historical_fallback/mod.rs index c6d9f86af69..7d80b359e87 100644 --- a/crates/iota-indexer/src/historical_fallback/mod.rs +++ b/crates/iota-indexer/src/historical_fallback/mod.rs @@ -10,3 +10,4 @@ pub(crate) mod client; pub(crate) mod convert; +pub(crate) mod reader; diff --git a/crates/iota-indexer/src/historical_fallback/reader.rs b/crates/iota-indexer/src/historical_fallback/reader.rs new file mode 100644 index 00000000000..bda1fdc47b4 --- /dev/null +++ b/crates/iota-indexer/src/historical_fallback/reader.rs @@ -0,0 +1,511 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +//! Module containing the historical fallback reader implementation. +//! +//! This module provides a high-level client to interact with the historical +//! fallback storage. It enables integration with the +//! [`IndexerReader`](crate::read::IndexerReader) for fallback capabilities when +//! the indexer is unable to fetch data from the database, which is especially +//! useful when pruning is enabled. + +#![expect(dead_code)] + +use std::collections::HashMap; + +use futures::future; +use iota_json_rpc_types::{CheckpointId, IotaEvent}; +use iota_rest_api::CheckpointTransaction; +use iota_types::{ + base_types::{ObjectID, SequenceNumber}, + digests::TransactionDigest, + effects::{TransactionEffects, TransactionEffectsAPI}, + event::EventID, + messages_checkpoint::{ + CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, + }, + object::Object, +}; +use itertools::{Either, Itertools, izip}; + +use crate::{ + errors::{IndexerError, IndexerResult}, + historical_fallback::{ + client::{HttpRestKVClient, KeyValueStoreClient}, + convert::{ + HistoricalFallbackCheckpoint, HistoricalFallbackEvents, HistoricalFallbackTransaction, + }, + }, + models::{ + checkpoints::StoredCheckpoint, objects::StoredObject, transactions::StoredTransaction, + }, + read::PackageResolver, +}; + +/// Represents the Input objects of a transaction. +pub type InputObjects = Vec; +/// Represents the Output objects of a transaction. +pub type OutputObjects = Vec; + +/// A high-level client to interact with the historical fallback storage. +/// +/// Provides convenient methods to fetch data from the historical fallback +/// storage, with automatic conversions into types the Indexer uses when +/// fetching from the database. This enables integration with the +/// [`IndexerReader`](crate::read::IndexerReader) for fallback capabilities +/// when the indexer is unable to fetch data from the database, which is +/// especially useful when pruning is enabled. +pub(crate) struct HistoricalFallbackReader { + /// Client responsible for fetching data from the historical fallback + /// storage through REST API interface. + client: HttpRestKVClient, + package_resolver: PackageResolver, +} + +impl HistoricalFallbackReader { + pub fn new(rest_kv_url: &str, package_resolver: PackageResolver) -> IndexerResult { + let client = HttpRestKVClient::new(rest_kv_url)?; + Ok(Self { + client, + package_resolver, + }) + } + + /// Resolves the input and output objects from a given transaction effects. + pub(crate) async fn resolve_transaction_input_output_objects( + &self, + transaction_effects: &TransactionEffects, + ) -> IndexerResult<(InputObjects, OutputObjects)> { + let input_object_keys = transaction_effects.modified_at_versions(); + + let output_object_keys = transaction_effects + .all_changed_objects() + .into_iter() + .map(|((object_id, version, _object_digest), _owner, _kind)| (object_id, version)) + .collect::>(); + + let (raw_input_objects, raw_output_objects) = tokio::try_join!( + self.client.multi_get_objects(&input_object_keys), + self.client.multi_get_objects(&output_object_keys), + )?; + + let input_objects = raw_input_objects + .into_iter() + .zip(&input_object_keys) + .map(|(object, (object_id, version))| { + object.ok_or_else(|| IndexerError::HistoricalFallbackObjectNotFound { + object_id: *object_id, + version: *version, + }) + }) + .collect::>>()?; + + let output_objects = raw_output_objects + .into_iter() + .zip(&output_object_keys) + .map(|(object, (object_id, version))| { + object.ok_or_else(|| IndexerError::HistoricalFallbackObjectNotFound { + object_id: *object_id, + version: *version, + }) + }) + .collect::>>()?; + + Ok((input_objects, output_objects)) + } + + /// Resolves the checkpoint summaries and contents by the provided + /// transaction digests + pub(crate) async fn resolve_checkpoints( + &self, + tx_digests: &[TransactionDigest], + ) -> IndexerResult> { + let tx_to_checkpoint_seq_num = self + .client + .multi_get_transactions_perpetual_checkpoints(tx_digests) + .await?; + + // deduplicate checkpoint sequence numbers to avoid fetching the same summary + // multiple times. + let unique_seq_nums = tx_to_checkpoint_seq_num + .iter() + .flatten() + .unique() + .copied() + .collect::>(); + + let (summaries, contents) = tokio::try_join!( + self.client + .multi_get_checkpoints_summaries_by_sequence_numbers(&unique_seq_nums), + self.client.multi_get_checkpoints_contents(&unique_seq_nums) + )?; + + let checkpoints_map = summaries + .into_iter() + .zip(contents.into_iter()) + .filter_map(|(summary, contents)| { + summary.and_then(|summary| contents.map(|contents| (summary.sequence_number, (summary, contents)))) + }) + .collect::>(); + + // map each tx digest to its checkpoint summary + let summaries = tx_digests + .iter() + .zip(tx_to_checkpoint_seq_num) + .filter_map(|(digest, seq_num)| { + let seq = seq_num?; + checkpoints_map + .get(&seq) + .cloned() + .map(|(summary, contents)| (*digest, (summary, contents))) + }) + .collect(); + + Ok(summaries) + } + + /// Fetches a checkpoint by either a [`CheckpointSequenceNumber`] or + /// [`CheckpointDigest`](iota_types::digests::CheckpointDigest). + pub(crate) async fn checkpoint( + &self, + id: CheckpointId, + ) -> IndexerResult> { + let (summaries, contents) = match id { + CheckpointId::SequenceNumber(sequence_number) => { + let seq_nums = [sequence_number]; + tokio::try_join!( + self.client + .multi_get_checkpoints_summaries_by_sequence_numbers(&seq_nums), + self.client.multi_get_checkpoints_contents(&seq_nums) + )? + } + CheckpointId::Digest(digest) => { + let summaries = self + .client + .multi_get_checkpoints_summaries_by_digests(&[digest]) + .await?; + + let Some(seq_num) = summaries + .first() + .and_then(|summary| summary.as_ref()) + .map(|summary| summary.sequence_number) + else { + return Ok(None); + }; + + let contents = self + .client + .multi_get_checkpoints_contents(&[seq_num]) + .await?; + + (summaries, contents) + } + }; + + let checkpoint = summaries + .into_iter() + .zip(contents) + .next() + .and_then(|(s, c)| Some(StoredCheckpoint::from((s?, c?)))); + + Ok(checkpoint) + } + + /// Fetches multiple checkpoints from the historical fallback storage. + /// + /// Returns checkpoints in paginated form, supporting both ascending and + /// descending order. + /// + /// # Pagination Behavior + /// + /// | cursor | descending | Result | + /// |--------|------------|-----------------------------| + /// | `n` | `false` | Starts from checkpoint n+1 | + /// | `n` | `true` | Starts from checkpoint n-1 | + /// + /// # NOTE + /// `StoredCheckpoint.successful_tx_num` is hardcoded to 0, due to missing + /// data in the historical fallback. It can be derived but the operations + /// could be expensive. Can be added in future iterations. + pub(crate) async fn checkpoints( + &self, + cursor: CheckpointSequenceNumber, + limit: usize, + descending_order: bool, + ) -> IndexerResult>> { + if limit == 0 { + return Ok(vec![]); + } + + let seq_nums: Vec = if descending_order { + // descending: start from cursor - 1, go down `limit` items + let end = cursor.saturating_sub(limit as u64); + (end..cursor).rev().collect() + } else { + // ascending: start from cursor + 1, go up `limit` items + let start = cursor + 1; + let end = start + limit as u64; + (start..end).collect() + }; + + if seq_nums.is_empty() { + return Ok(vec![]); + } + + let (summaries, contents) = tokio::try_join!( + self.client + .multi_get_checkpoints_summaries_by_sequence_numbers(&seq_nums), + self.client.multi_get_checkpoints_contents(&seq_nums) + )?; + + let checkpoints = summaries + .into_iter() + .zip(contents) + .map(|(s, c)| Some(StoredCheckpoint::from((s?, c?)))) + .collect(); + + Ok(checkpoints) + } + + /// Fetches all events belonging to the provided transaction digest. + pub(crate) async fn all_events( + &self, + tx_digest: TransactionDigest, + ) -> IndexerResult> { + let tx_digests = &[tx_digest]; + let (events, checkpoint_summaries) = tokio::try_join!( + self.client.multi_get_events_by_tx_digests(tx_digests), + self.resolve_checkpoints(tx_digests) + )?; + + let Some(historical_events) = events.into_iter().next().and_then(|events| { + let (summary, _) = checkpoint_summaries.get(&tx_digest).cloned()?; + Some(HistoricalFallbackEvents::new(events?, summary)) + }) else { + return Ok(vec![]); + }; + + historical_events + .into_iota_events(self.package_resolver.clone(), tx_digest) + .await + } + + /// Fetches transactions from the provided transaction digests. + pub(crate) async fn transactions( + &self, + tx_digests: &[TransactionDigest], + ) -> IndexerResult>> { + let (transactions, effects, events, checkpoints) = tokio::try_join!( + self.client.multi_get_transactions(tx_digests), + self.client.multi_get_effects(tx_digests), + self.client.multi_get_events_by_tx_digests(tx_digests), + self.resolve_checkpoints(tx_digests), + )?; + + let futures = + izip!(transactions, effects, events).map(|(transaction, effects, events)| async { + let (Some(transaction), Some(effects)) = (transaction, effects) else { + return Ok(None); + }; + + let historical_checkpoint = checkpoints + .get(transaction.digest()) + .cloned() + // if transaction exists but summary is not found this indicates a bug in data + // consistency in the KV Store. + .ok_or_else(|| { + IndexerError::HistoricalFallbackStorageError(format!( + "checkpoint summary and contents linked to transaction: {} not found", + transaction.digest() + )) + })?; + + let (input_objects, output_objects) = self + .resolve_transaction_input_output_objects(&effects) + .await?; + + let checkpoint_transaction = CheckpointTransaction { + transaction, + effects, + events, + input_objects, + output_objects, + }; + + HistoricalFallbackTransaction::new(checkpoint_transaction, historical_checkpoint) + .into_stored_transaction() + .await + .map(Some) + }); + + future::try_join_all(futures).await + } + + /// Fetches objects by their ID and version from historical fallback + /// storage. + /// + /// - If `before_version` is `false`, it looks for the exact version. + /// - If `true`, it finds the latest version before the given one. + /// + /// # Note + /// + /// Currently only supports `before_version = false`. + /// + /// Support for `before_version = true` will be added once range scan is + /// implemented on the KV REST API. + pub(crate) async fn objects( + &self, + object_refs: &[(ObjectID, SequenceNumber)], + before_version: bool, + ) -> IndexerResult>> { + if before_version { + // TODO: Implement once range scan is available on KV REST API + // For now, we cannot determine the correct previous version without it due to + // non-contiguous object versioning: + // https://docs.iota.org/developer/iota-101/objects/versioning#move-objects. + return Ok(vec![None; object_refs.len()]); + } + + let stored_objects = self + .client + .multi_get_objects(object_refs) + .await? + .into_iter() + .map(|obj| obj.map(StoredObject::from)) + .collect(); + + Ok(stored_objects) + } + + /// Fetches transactions belonging to a specific checkpoint. + /// + /// Returns transactions in paginated form, supporting both ascending and + /// descending order within the checkpoint. + /// + /// # Pagination Behavior + /// + /// | cursor | descending | Result | + /// |------------|------------|---------------------------------------------| + /// | `None` | `false` | Starts from first transaction in checkpoint | + /// | `None` | `true` | Starts from last transaction in checkpoint | + /// | `Some(tx)` | `false` | Starts after `tx`, ascending | + /// | `Some(tx)` | `true` | Starts after `tx`, descending | + pub(crate) async fn checkpoint_transactions( + &self, + cursor: Option, + checkpoint_sequence_number: CheckpointSequenceNumber, + limit: usize, + is_descending: bool, + ) -> IndexerResult>> { + if limit == 0 { + return Ok(vec![]); + } + + let Some(contents) = self + .client + .multi_get_checkpoints_contents(&[checkpoint_sequence_number]) + .await? + .into_iter() + .next() + .flatten() + else { + return Ok(vec![]); + }; + + let tx_digests = contents.iter().map(|b| b.transaction); + + // apply ordering + let tx_digests = if is_descending { + Either::Left(tx_digests.rev()) + } else { + Either::Right(tx_digests) + }; + + // apply cursor: skip transactions until after the cursor. + // + // This relies on transactions being ordered within checkpoint contents, + // so we can skip until we find the cursor, then skip the cursor itself. + let tx_digests = if let Some(cursor) = cursor { + Either::Left( + tx_digests + .skip_while(move |digest| *digest != cursor) + .skip(1), // skip the cursor itself + ) + } else { + Either::Right(tx_digests) + }; + + // apply limit + let tx_digests = tx_digests + .into_iter() + .take(limit) + .collect::>(); + + self.transactions(&tx_digests).await + } + + /// Fetches events for a specific transaction. + /// + /// Returns events emitted by the specified transaction, with support for + /// cursor-based pagination and ordering. + /// + /// # Pagination Behavior + /// + /// Events are indexed by their position in the transaction (event_seq = 0, + /// 1, 2, ...). + /// + /// | cursor | descending | Result | + /// |-------------|------------|--------------------------| + /// | `None` | `false` | Starts from event_seq 0 | + /// | `None` | `true` | Starts from last event | + /// | `Some(seq)` | `false` | Starts after event_seq | + /// | `Some(seq)` | `true` | Starts before event_seq | + pub(crate) async fn events( + &self, + tx_digest: TransactionDigest, + cursor: Option, + limit: usize, + descending_order: bool, + ) -> IndexerResult> { + if limit == 0 { + return Ok(vec![]); + } + + // validate cursor if provided + let start_seq = if let Some(cursor) = cursor { + if cursor.tx_digest != tx_digest { + return Err(IndexerError::InvalidArgument(format!( + "Cursor tx_digest {} does not match requested tx_digest {tx_digest}", + cursor.tx_digest + ))); + } + Some(cursor.event_seq) + } else { + None + }; + + let events = self.all_events(tx_digest).await?; + + // apply ordering, cursor, and limit + let events = if descending_order { + events + .into_iter() + .enumerate() + .rev() // reverse for descending + .filter(|(idx, _)| start_seq.is_none_or(|seq| (*idx as u64) < seq)) + .take(limit) + .map(|(_, event)| event) + .collect() + } else { + events + .into_iter() + .enumerate() + .filter(|(idx, _)| start_seq.is_none_or(|seq| (*idx as u64) > seq)) + .take(limit) + .map(|(_, event)| event) + .collect() + }; + + Ok(events) + } +}