Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion crates/iota-indexer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use iota_data_ingestion_core::IngestionError;
use iota_json_rpc_api::{error_object_from_rpc, internal_error};
use iota_names::error::IotaNamesError;
use iota_types::{
base_types::ObjectIDParseError,
base_types::{ObjectID, ObjectIDParseError, SequenceNumber},
error::{IotaError, IotaObjectResponseError, UserInputError},
};
use jsonrpsee::{core::ClientError as RpcError, types::ErrorObjectOwned};
Expand Down Expand Up @@ -153,6 +153,14 @@ pub enum IndexerError {

#[error("Transaction dependencies have not been indexed")]
TransactionDependenciesNotIndexed,

#[error("historical fallback object not found: id {object_id}, version {version}")]
HistoricalFallbackObjectNotFound {
object_id: ObjectID,
version: SequenceNumber,
},
#[error("historical fallback storage error: {0}")]
HistoricalFallbackStorageError(String),
}

pub type IndexerResult<T> = Result<T, IndexerError>;
Expand Down
7 changes: 2 additions & 5 deletions crates/iota-indexer/src/historical_fallback/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) 2025 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

//! Module containing the client for interacting with the REST API KV server.

use std::str::FromStr;

use bytes::Bytes;
Expand All @@ -26,7 +28,6 @@ use tracing::{error, info, instrument, trace, warn};

use crate::errors::IndexerResult;

#[expect(dead_code)]
pub(crate) trait KeyValueStoreClient {
async fn multi_get_transactions(
&self,
Expand Down Expand Up @@ -75,7 +76,6 @@ pub(crate) struct HttpRestKVClient {
}

impl HttpRestKVClient {
#[expect(dead_code)]
pub fn new(base_url: &str) -> IndexerResult<Self> {
info!("creating HttpRestKVClient with base_url: {}", base_url);

Expand All @@ -98,7 +98,6 @@ impl HttpRestKVClient {
Ok(Url::from_str(joined.as_str())?)
}

#[expect(dead_code)]
async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IndexerResult<Option<Bytes>>> {
let len = uris.len();
let fetches = stream::iter(uris.into_iter().map(|url| self.fetch(url)));
Expand Down Expand Up @@ -138,7 +137,6 @@ where
.ok()
}

#[expect(dead_code)]
fn map_fetch<'a, K>(fetch: (&'a IndexerResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
where
K: std::fmt::Debug,
Expand All @@ -154,7 +152,6 @@ where
}
}

#[expect(dead_code)]
fn deser_check_digest<T, D>(
digest: &D,
bytes: &Bytes,
Expand Down
49 changes: 22 additions & 27 deletions crates/iota-indexer/src/historical_fallback/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ use iota_rest_api::CheckpointTransaction;
use iota_types::{
digests::TransactionDigest,
effects::TransactionEvents,
messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
},
messages_checkpoint::{CertifiedCheckpointSummary, CheckpointContents},
object::Object,
};
use prometheus::Registry;

use crate::{
errors::IndexerResult,
errors::{IndexerError, IndexerResult},
ingestion::{common::prepare::extract_df_kind, primary::prepare::PrimaryWorker},
metrics::IndexerMetrics,
models::{
Expand All @@ -37,13 +35,13 @@ use crate::{
/// Alias for an [`Object`] fetched from historical fallback storage.
///
/// Contains all data needed to reconstruct a [`StoredObject`].
type HistoricalFallbackObject = Object;
pub(crate) type HistoricalFallbackObject = Object;

/// Alias for [`CertifiedCheckpointSummary`] with its [`CheckpointContents`]
/// data fetched from historical fallback storage.
///
/// Contains all data needed to reconstruct a [`StoredCheckpoint`].
type HistoricalFallbackCheckpoint = (CertifiedCheckpointSummary, CheckpointContents);
pub(crate) type HistoricalFallbackCheckpoint = (CertifiedCheckpointSummary, CheckpointContents);

impl From<HistoricalFallbackObject> for StoredObject {
fn from(object: HistoricalFallbackObject) -> Self {
Expand Down Expand Up @@ -81,7 +79,6 @@ pub struct HistoricalFallbackEvents {
}

impl HistoricalFallbackEvents {
#[expect(dead_code)]
pub fn new(events: TransactionEvents, checkpoint_summary: CertifiedCheckpointSummary) -> Self {
Self {
events,
Expand All @@ -91,7 +88,6 @@ impl HistoricalFallbackEvents {

/// Converts the raw [`TransactionEvents`] into JSON RPC compatible
/// [`IotaEvent`]s.
#[expect(dead_code)]
pub(crate) async fn into_iota_events(
self,
package_resolver: Arc<Resolver<impl PackageStore>>,
Expand All @@ -116,42 +112,41 @@ pub struct HistoricalFallbackTransaction {
/// Checkpointed transaction data.
checkpoint_transaction: CheckpointTransaction,
/// Checkpoint sequence number the transaction is part of.
checkpoint_sequence_number: CheckpointSequenceNumber,
/// Checkpoint timestamp.
timestamp: u64,
historical_checkpoint: HistoricalFallbackCheckpoint,
}

impl HistoricalFallbackTransaction {
#[expect(dead_code)]
pub fn new(
checkpoint_transaction: CheckpointTransaction,
checkpoint_summary: CertifiedCheckpointSummary,
historical_checkpoint: HistoricalFallbackCheckpoint,
) -> Self {
Self {
checkpoint_transaction,
checkpoint_sequence_number: checkpoint_summary.sequence_number,
timestamp: checkpoint_summary.timestamp_ms,
historical_checkpoint,
}
}

/// Converts the historical fallback transaction into a
/// [`StoredTransaction`].
#[expect(dead_code)]
async fn into_stored_transaction(self) -> IndexerResult<StoredTransaction> {
// StoredTransaction::try_into_iota_transaction_block_response implementation
// does not use the `tx_sequence_number`, in this regard it is safe to
// hardcode to 0.
//
// If in future iterations, the `tx_sequence_number` will be needed, by
// importing the CheckpointContents we'll be able to derive it by using the
// CheckpointContents::enumerate_transactions method.
let tx_sequence_number = 0;
pub(crate) async fn into_stored_transaction(self) -> IndexerResult<StoredTransaction> {
let tx_digest = self.checkpoint_transaction.transaction.digest();
let (summary, contents) = self.historical_checkpoint;

let Some(tx_sequence_number) = contents
.enumerate_transactions(&summary)
.find(|(_seq, execution_digest)| &execution_digest.transaction == tx_digest)
.map(|(seq, _execution_digest)| seq)
else {
return Err(IndexerError::HistoricalFallbackStorageError(format!(
"cannot find transaction sequence number to transaction: {tx_digest}"
)));
};

let indexed_tx = PrimaryWorker::index_transaction(
&self.checkpoint_transaction,
tx_sequence_number,
self.checkpoint_sequence_number,
self.timestamp,
summary.sequence_number,
summary.timestamp_ms,
&IndexerMetrics::new(&Registry::new()),
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions crates/iota-indexer/src/historical_fallback/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@

pub(crate) mod client;
pub(crate) mod convert;
pub(crate) mod reader;
Loading
Loading