Skip to content

Commit e4e59f6

Browse files
committed
feat(indexer): Use historic fallback in IndexerAPI
1 parent a91931e commit e4e59f6

File tree

3 files changed

+192
-87
lines changed

3 files changed

+192
-87
lines changed

crates/iota-indexer/src/errors.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ pub enum IndexerError {
8282
#[error("Indexer failed to assign TX global order with error: `{0}`")]
8383
PostgresUniqueTxGlobalOrderViolation(String),
8484

85+
#[error("Indexer cannot provide results because of pruned data: `{0}`")]
86+
PostgresDataPruned(String),
87+
8588
#[error("Indexer failed to initialize fullnode Http client with error: `{0}`")]
8689
HttpClientInit(String),
8790

crates/iota-indexer/src/read.rs

Lines changed: 171 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,11 @@ use tap::TapFallible;
5656
use crate::{
5757
apis::GovernanceReadApi,
5858
db::{ConnectionConfig, ConnectionPool, ConnectionPoolConfig},
59-
errors::IndexerError,
59+
errors::{Context, IndexerError},
60+
historical_fallback::reader::HistoricalFallbackReader,
6061
models::{
6162
address_metrics::StoredAddressMetrics,
62-
checkpoints::{StoredChainIdentifier, StoredCheckpoint},
63+
checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
6364
display::StoredDisplay,
6465
epoch::StoredEpochInfo,
6566
events::StoredEvent,
@@ -98,6 +99,7 @@ pub struct IndexerReader {
9899
pool: ConnectionPool,
99100
package_resolver: PackageResolver,
100101
obj_type_cache: Arc<Mutex<SizedCache<String, Option<ObjectID>>>>,
102+
kv_reader: Option<Arc<HistoricalFallbackReader>>,
101103
}
102104

103105
pub type PackageResolver = Arc<Resolver<PackageStoreWithLruCache<IndexerStorePackageResolver>>>;
@@ -113,6 +115,7 @@ impl IndexerReader {
113115
pool,
114116
package_resolver,
115117
obj_type_cache,
118+
kv_reader: None,
116119
}
117120
}
118121

@@ -1014,14 +1017,37 @@ impl IndexerReader {
10141017
})
10151018
}
10161019

1017-
async fn query_transaction_blocks_by_checkpoint_impl(
1020+
async fn get_lowest_unpruned_checkpoint(&self) -> IndexerResult<StoredCpTx> {
1021+
let pool = self.get_pool();
1022+
// This should be replaced with reading the "watermarks" table once it is used
1023+
// by the pruner
1024+
run_query_async!(&pool, |conn| {
1025+
pruner_cp_watermark::dsl::pruner_cp_watermark
1026+
.order_by(pruner_cp_watermark::checkpoint_sequence_number.asc())
1027+
.first::<StoredCpTx>(conn)
1028+
})
1029+
}
1030+
1031+
async fn query_transaction_blocks_from_db_by_checkpoint_impl(
10181032
&self,
10191033
checkpoint_seq: u64,
1020-
options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1021-
cursor_tx_seq: Option<i64>,
1034+
cursor: Option<TransactionDigest>,
10221035
limit: usize,
10231036
is_descending: bool,
1024-
) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
1037+
) -> IndexerResult<Vec<StoredTransaction>> {
1038+
let lowest_unpruned_cp = self.get_lowest_unpruned_checkpoint().await?;
1039+
if checkpoint_seq < lowest_unpruned_cp.checkpoint_sequence_number as u64 {
1040+
return Err(IndexerError::PostgresDataPruned(format!(
1041+
"requesting data from checkpoint {checkpoint_seq}, lowest not pruned checkpoint is {}",
1042+
lowest_unpruned_cp.checkpoint_sequence_number
1043+
)));
1044+
}
1045+
let cursor_tx_seq = if let Some(cursor) = cursor {
1046+
Some(self.resolve_cursor_tx_digest_to_seq_num(cursor).await?)
1047+
} else {
1048+
None
1049+
};
1050+
10251051
let pool = self.get_pool();
10261052
let tx_range: (i64, i64) = run_query_async!(&pool, move |conn| {
10271053
pruner_cp_watermark::dsl::pruner_cp_watermark
@@ -1053,12 +1079,9 @@ impl IndexerReader {
10531079
query = query.order(transactions::dsl::tx_sequence_number.asc());
10541080
}
10551081
let pool = self.get_pool();
1056-
let stored_txes = run_query_async!(&pool, move |conn| query
1082+
run_query_async!(&pool, move |conn| query
10571083
.limit(limit as i64)
1058-
.load::<StoredTransaction>(conn))?;
1059-
1060-
self.stored_transaction_to_transaction_block(stored_txes, options)
1061-
.await
1084+
.load::<StoredTransaction>(conn))
10621085
}
10631086

10641087
pub async fn query_transaction_blocks_in_blocking_task(
@@ -1097,6 +1120,58 @@ impl IndexerReader {
10971120
.await
10981121
}
10991122

1123+
async fn query_transaction_blocks_from_given_checkpoint(
1124+
&self,
1125+
checkpoint_seq: u64,
1126+
cursor: Option<TransactionDigest>,
1127+
limit: usize,
1128+
is_descending: bool,
1129+
options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1130+
) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
1131+
let db_res = self
1132+
.query_transaction_blocks_from_db_by_checkpoint_impl(
1133+
checkpoint_seq,
1134+
cursor,
1135+
limit,
1136+
is_descending,
1137+
)
1138+
.await;
1139+
let stored_txs = if let (Err(IndexerError::PostgresDataPruned(err)), Some(kv_reader)) =
1140+
(db_res.as_ref(), self.kv_reader.as_ref())
1141+
{
1142+
let fallback_reason = format!("fallback triggered by {err}");
1143+
let txs = kv_reader
1144+
.checkpoint_transactions(cursor, checkpoint_seq, limit, is_descending)
1145+
.await
1146+
.context(&fallback_reason)?;
1147+
if txs.iter().any(|tx| tx.is_none()) {
1148+
return Err(IndexerError::Generic(format!(
1149+
"Historic Fallback failed, KV doesn't have full data for checkpoint {checkpoint_seq}, {fallback_reason}"
1150+
)));
1151+
}
1152+
txs.into_iter().flatten().collect::<Vec<_>>()
1153+
} else {
1154+
db_res?
1155+
};
1156+
self.stored_transaction_to_transaction_block(stored_txs, options)
1157+
.await
1158+
}
1159+
1160+
async fn resolve_cursor_tx_digest_to_seq_num(
1161+
&self,
1162+
cursor: TransactionDigest,
1163+
) -> IndexerResult<i64> {
1164+
let pool = self.get_pool();
1165+
run_query_async!(&pool, move |conn| {
1166+
tx_digests::table
1167+
.select(tx_digests::tx_sequence_number)
1168+
// we filter the tx_digests table because it is indexed by digest,
1169+
// transactions (and other tables) are not
1170+
.filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec()))
1171+
.first::<i64>(conn)
1172+
})
1173+
}
1174+
11001175
async fn query_transaction_blocks_impl_with_checkpointed_data_only(
11011176
&self,
11021177
filter: Option<TransactionFilterKind>,
@@ -1105,17 +1180,24 @@ impl IndexerReader {
11051180
limit: usize,
11061181
is_descending: bool,
11071182
) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
1183+
match filter {
1184+
Some(TransactionFilterKind::V1(TransactionFilter::Checkpoint(seq)))
1185+
| Some(TransactionFilterKind::V2(TransactionFilterV2::Checkpoint(seq))) => {
1186+
return self
1187+
.query_transaction_blocks_from_given_checkpoint(
1188+
seq,
1189+
cursor,
1190+
limit,
1191+
is_descending,
1192+
options,
1193+
)
1194+
.await;
1195+
}
1196+
_ => {} // remaining cases will be handled below
1197+
};
1198+
11081199
let cursor_tx_seq = if let Some(cursor) = cursor {
1109-
let pool = self.get_pool();
1110-
let tx_seq = run_query_async!(&pool, move |conn| {
1111-
tx_digests::table
1112-
.select(tx_digests::tx_sequence_number)
1113-
// we filter the tx_digests table because it is indexed by digest,
1114-
// transactions (and other tables) are not
1115-
.filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec()))
1116-
.first::<i64>(conn)
1117-
})?;
1118-
Some(tx_seq)
1200+
Some(self.resolve_cursor_tx_digest_to_seq_num(cursor).await?)
11191201
} else {
11201202
None
11211203
};
@@ -1129,19 +1211,12 @@ impl IndexerReader {
11291211
"".to_string()
11301212
};
11311213
let order_str = if is_descending { "DESC" } else { "ASC" };
1214+
11321215
let (table_name, main_where_clause) = match filter {
11331216
// Processed above
1134-
Some(TransactionFilterKind::V1(TransactionFilter::Checkpoint(seq)))
1135-
| Some(TransactionFilterKind::V2(TransactionFilterV2::Checkpoint(seq))) => {
1136-
return self
1137-
.query_transaction_blocks_by_checkpoint_impl(
1138-
seq,
1139-
options,
1140-
cursor_tx_seq,
1141-
limit,
1142-
is_descending,
1143-
)
1144-
.await;
1217+
Some(TransactionFilterKind::V1(TransactionFilter::Checkpoint(_)))
1218+
| Some(TransactionFilterKind::V2(TransactionFilterV2::Checkpoint(_))) => {
1219+
unreachable!("handled in earlier match statement")
11451220
}
11461221
// FIXME: sanitize module & function
11471222
Some(TransactionFilterKind::V1(TransactionFilter::MoveFunction {
@@ -1524,35 +1599,65 @@ impl IndexerReader {
15241599
limit: usize,
15251600
descending_order: bool,
15261601
) -> IndexerResult<Vec<IotaEvent>> {
1527-
let ckpt_events = self
1528-
.query_events_by_tx_digest_checkpointed(tx_digest, cursor, limit, descending_order)
1529-
.await?;
1602+
let db_res = self
1603+
.query_events_from_db_by_tx_digest_checkpointed(
1604+
tx_digest,
1605+
cursor,
1606+
limit,
1607+
descending_order,
1608+
)
1609+
.await;
15301610

1531-
let mut iota_event_futures = vec![];
1532-
for stored_event in ckpt_events {
1533-
iota_event_futures.push(tokio::task::spawn(
1534-
stored_event.try_into_iota_event(self.package_resolver.clone()),
1535-
));
1536-
}
1611+
if let (Err(IndexerError::PostgresDataPruned(err)), Some(kv_reader)) =
1612+
(db_res.as_ref(), self.kv_reader.as_ref())
1613+
{
1614+
let fallback_reason = format!("fallback triggered by {err}");
1615+
kv_reader
1616+
.transaction_events(tx_digest, cursor, limit, descending_order)
1617+
.await
1618+
.context(&fallback_reason)
1619+
} else {
1620+
let mut iota_event_futures = vec![];
1621+
for stored_event in db_res? {
1622+
iota_event_futures.push(tokio::task::spawn(
1623+
stored_event.try_into_iota_event(self.package_resolver.clone()),
1624+
));
1625+
}
15371626

1538-
let iota_events = futures::future::join_all(iota_event_futures)
1539-
.await
1540-
.into_iter()
1541-
.collect::<Result<Vec<_>, _>>()
1542-
.tap_err(|e| tracing::error!("failed to join iota event futures: {e}"))?
1543-
.into_iter()
1544-
.collect::<Result<Vec<_>, _>>()
1545-
.tap_err(|e| tracing::error!("failed to collect iota event futures: {e}"))?;
1546-
Ok(iota_events)
1627+
futures::future::join_all(iota_event_futures)
1628+
.await
1629+
.into_iter()
1630+
.collect::<Result<Vec<_>, _>>()
1631+
.tap_err(|e| tracing::error!("failed to join iota event futures: {e}"))?
1632+
.into_iter()
1633+
.collect::<Result<Vec<_>, _>>()
1634+
.tap_err(|e| tracing::error!("failed to collect iota event futures: {e}"))
1635+
}
15471636
}
15481637

1549-
async fn query_events_by_tx_digest_checkpointed(
1638+
async fn query_events_from_db_by_tx_digest_checkpointed(
15501639
&self,
15511640
tx_digest: TransactionDigest,
15521641
cursor: Option<EventID>,
15531642
limit: usize,
15541643
descending_order: bool,
15551644
) -> IndexerResult<Vec<StoredEvent>> {
1645+
let tx_seq = self
1646+
.resolve_cursor_tx_digest_to_seq_num(tx_digest)
1647+
.await
1648+
.map_err(|err| {
1649+
IndexerError::PostgresDataPruned(format!(
1650+
"Data for tx {tx_digest} potentially pruned: {err}"
1651+
))
1652+
})?;
1653+
let lowest_unpruned_cp = self.get_lowest_unpruned_checkpoint().await?;
1654+
if tx_seq < lowest_unpruned_cp.min_tx_sequence_number {
1655+
return Err(IndexerError::PostgresDataPruned(format!(
1656+
"requesting data for tx seq {tx_seq}, lowest not pruned tx seq is {}",
1657+
lowest_unpruned_cp.min_tx_sequence_number
1658+
)));
1659+
}
1660+
15561661
let mut query = events::table.into_boxed();
15571662

15581663
if let Some(cursor) = cursor {
@@ -1578,14 +1683,7 @@ impl IndexerReader {
15781683
query = query.order(events::event_sequence_number.asc());
15791684
}
15801685

1581-
query = query.filter(
1582-
events::tx_sequence_number.nullable().eq(tx_digests::table
1583-
.select(tx_digests::tx_sequence_number)
1584-
// we filter the tx_digests table because it is indexed by digest,
1585-
// events table is not
1586-
.filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1587-
.single_value()),
1588-
);
1686+
query = query.filter(events::tx_sequence_number.nullable().eq(tx_seq));
15891687

15901688
let pool = self.get_pool();
15911689
run_query_async!(&pool, move |conn| {
@@ -1600,27 +1698,23 @@ impl IndexerReader {
16001698
limit: usize,
16011699
descending_order: bool,
16021700
) -> IndexerResult<Vec<IotaEvent>> {
1603-
let pool = self.get_pool();
1701+
if let EventFilter::Transaction(tx_digest) = filter {
1702+
return self
1703+
.query_events_by_tx_digest_checkpointed_only(
1704+
tx_digest,
1705+
cursor,
1706+
limit,
1707+
descending_order,
1708+
)
1709+
.await;
1710+
}
1711+
16041712
let (tx_seq, event_seq) = if let Some(cursor) = cursor {
16051713
let EventID {
16061714
tx_digest,
16071715
event_seq,
16081716
} = cursor;
1609-
let tx_seq = run_query_async!(&pool, move |conn| {
1610-
transactions::dsl::transactions
1611-
.select(transactions::tx_sequence_number)
1612-
.filter(
1613-
transactions::tx_sequence_number
1614-
.nullable()
1615-
.eq(tx_digests::table
1616-
.select(tx_digests::tx_sequence_number)
1617-
// we filter the tx_digests table because it is indexed by digest,
1618-
// transactions table is not
1619-
.filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1620-
.single_value()),
1621-
)
1622-
.first::<i64>(conn)
1623-
})?;
1717+
let tx_seq: i64 = self.resolve_cursor_tx_digest_to_seq_num(tx_digest).await?;
16241718
(tx_seq, event_seq as i64)
16251719
} else if descending_order {
16261720
let max_tx_seq = i64::MAX;
@@ -1662,15 +1756,8 @@ impl IndexerReader {
16621756
order_clause,
16631757
limit,
16641758
)
1665-
} else if let EventFilter::Transaction(tx_digest) = filter {
1666-
return self
1667-
.query_events_by_tx_digest_checkpointed_only(
1668-
tx_digest,
1669-
cursor,
1670-
limit,
1671-
descending_order,
1672-
)
1673-
.await;
1759+
} else if let EventFilter::Transaction(_) = filter {
1760+
unreachable!("case handled earlier in the function")
16741761
} else {
16751762
let main_where_clause = match filter {
16761763
EventFilter::Package(package_id) => {

0 commit comments

Comments
 (0)