Skip to content

Commit

Permalink
feat(indexer): new JRPC method to list substates (#1049)
Browse files Browse the repository at this point in the history
Description
---
* Added new columns `template_address`, `module_name` and `timestamp` to
the `substates` SQLite table.
* Added new column `timestamp` to the `events` SQLite table. For now
it's populated but not used by any method yet.
* The `event_scanner` fetches and stores in database the values for all
the new fields in substates and events
* New JPRC method `list_substates`
* Updated TypeScript bindings related with the new indexer method
* Removed old logic for event storing in the `event_manager`, it's all
now done only on the `event_scanner` and controlled by the event filter
in the indexer config

Motivation and Context
---
On #1043 we refactored the
Indexer to automatically store all substates related to events. This PR
follows on that work and adds a new JRPC method `list_substates` that
allows to fetch and filter all the substates stored in the Indexer
database. It also adds some metadata (`template_address`, `module_name`,
`timestamp`) to substates.

The new method JRPC method `list_substates` has the following _optional_
parameters:
* **filter_by_template**: String with the template address of the
components that we want to filter
* **filter_by_type**: String with the only type of substate that we
want. Possible vaules are `"Component"`, `"Resource"`, `"Vault"`,
`"UnclaimedConfidentialOutput"`, `"NonFungible",` `"TransactionReceipt"`
and `"FeeClaim"`
* **limit**: u64 value for the maximum number of results that we want
(intended for pagination)
* **offset**: u64 with the position of the first result that we want
(intended for pagination)

How Has This Been Tested?
---
Manually by:
* Spawning a local network using `tari_swarm`
* Performing some transactions to generate events and substates
* Inspecting the Indexer database and also querying the new
`list_substate` method

What process can a PR reviewer use to test or verify this change?
---
See previous section

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
mrnaveira committed Jun 19, 2024
1 parent a4fd7c2 commit 03a72be
Show file tree
Hide file tree
Showing 19 changed files with 353 additions and 86 deletions.
18 changes: 3 additions & 15 deletions applications/tari_indexer/src/event_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ impl EventManager {
topic: String,
payload: &Metadata,
version: u64,
timestamp: u64,
) -> Result<(), anyhow::Error> {
let mut tx = self.substate_store.create_write_tx()?;
let new_event = NewEvent {
Expand All @@ -84,6 +85,7 @@ impl EventManager {
topic,
payload: payload.to_json().expect("Failed to convert to JSON"),
version: version as i32,
timestamp: timestamp as i64,
};
tx.save_event(new_event)?;
tx.commit()?;
Expand Down Expand Up @@ -165,24 +167,10 @@ impl EventManager {
.substate_scanner
.get_events_for_substate(&substate_id, Some(version))
.await?;

// stores the newest network events to the db
// because the same substate_id with different version
// can be processed in the same transaction, we need to avoid
// duplicates
for (version, event) in network_events {
let template_address = event.template_address();
let tx_hash = TransactionId::new(event.tx_hash().into_array());
let topic = event.topic();
let payload = event.payload();
self.save_event_to_db(
&substate_id,
template_address,
tx_hash,
topic,
payload,
u64::from(version),
)?;
for (_, event) in network_events {
events.push(event);
}

Expand Down
62 changes: 45 additions & 17 deletions applications/tari_indexer/src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::{HashMap, HashSet},
str::FromStr,
};
use std::{collections::HashMap, str::FromStr};

use futures::StreamExt;
use log::*;
Expand All @@ -36,7 +33,7 @@ use tari_dan_storage::consensus_models::{Block, BlockId, Decision, TransactionRe
use tari_engine_types::{
commit_result::{ExecuteResult, TransactionResult},
events::Event,
substate::{Substate, SubstateId},
substate::{Substate, SubstateId, SubstateValue},
};
use tari_epoch_manager::EpochManagerReader;
use tari_template_lib::models::{EntityId, TemplateAddress};
Expand Down Expand Up @@ -90,6 +87,12 @@ impl TryFrom<EventFilterConfig> for EventFilter {
}
}

#[derive(Default, Debug, Clone, PartialEq, Eq)]
struct TransactionMetadata {
pub transaction_id: TransactionId,
pub timestamp: u64,
}

pub struct EventScanner {
network: Network,
epoch_manager: Box<dyn EpochManagerReader<Addr = PeerAddress>>,
Expand Down Expand Up @@ -124,8 +127,6 @@ impl EventScanner {
let mut event_count = 0;

let current_epoch = self.epoch_manager.current_epoch().await?;
// let network_committee_info = self.epoch_manager.get_network_committees().await?;
// let epoch = network_committee_info.epoch;
let current_committees = self.epoch_manager.get_committees(current_epoch).await?;
for (shard, mut committee) in current_committees {
info!(
Expand All @@ -134,7 +135,6 @@ impl EventScanner {
current_epoch,
shard
);
// TODO: use the latest block id that we scanned for each committee
let new_blocks = self
.get_new_blocks_from_committee(shard, &mut committee, current_epoch)
.await?;
Expand All @@ -143,16 +143,16 @@ impl EventScanner {
"Scanned {} blocks",
new_blocks.len()
);
let transaction_ids = self.extract_transaction_ids_from_blocks(new_blocks);
let transactions = self.extract_transactions_from_blocks(new_blocks);
info!(
target: LOG_TARGET,
"Scanned {} transactions",
transaction_ids.len()
transactions.len()
);

for transaction_id in transaction_ids {
for transaction in transactions {
// fetch all the events in the transaction
let events = self.get_events_for_transaction(transaction_id).await?;
let events = self.get_events_for_transaction(transaction.transaction_id).await?;
event_count += events.len();

// only keep the events specified by the indexer filter
Expand All @@ -163,7 +163,7 @@ impl EventScanner {
"Filtered events: {}",
filtered_events.len()
);
self.store_events_in_db(&filtered_events).await?;
self.store_events_in_db(&filtered_events, transaction).await?;
}
}

Expand Down Expand Up @@ -223,7 +223,11 @@ impl EventScanner {
}
}

async fn store_events_in_db(&self, events_data: &Vec<EventData>) -> Result<(), anyhow::Error> {
async fn store_events_in_db(
&self,
events_data: &Vec<EventData>,
transaction: TransactionMetadata,
) -> Result<(), anyhow::Error> {
let mut tx = self.substate_store.create_write_tx()?;

for data in events_data {
Expand All @@ -234,6 +238,7 @@ impl EventScanner {
payload: data.event.payload().to_json().expect("Failed to convert to JSON"),
substate_id: data.event.substate_id().map(|s| s.to_string()),
version: 0_i32,
timestamp: transaction.timestamp as i64,
};

// TODO: properly avoid or handle duplicated events
Expand All @@ -258,10 +263,16 @@ impl EventScanner {

// store/update the related substate if any
if let (Some(substate_id), Some(substate)) = (data.event.substate_id(), &data.substate) {
let template_address = Self::extract_template_address_from_substate(substate).map(|t| t.to_string());
let module_name = Self::extract_module_name_from_substate(substate);
let substate_row = NewSubstate {
address: substate_id.to_string(),
version: i64::from(substate.version()),
data: Self::encode_substate(substate)?,
tx_hash: data.event.tx_hash().to_string(),
template_address,
module_name,
timestamp: transaction.timestamp as i64,
};
info!(
target: LOG_TARGET,
Expand All @@ -277,6 +288,20 @@ impl EventScanner {
Ok(())
}

fn extract_template_address_from_substate(substate: &Substate) -> Option<TemplateAddress> {
match substate.substate_value() {
SubstateValue::Component(c) => Some(c.template_address),
_ => None,
}
}

fn extract_module_name_from_substate(substate: &Substate) -> Option<String> {
match substate.substate_value() {
SubstateValue::Component(c) => Some(c.module_name.to_owned()),
_ => None,
}
}

fn encode_substate(substate: &Substate) -> Result<String, anyhow::Error> {
let pretty_json = serde_json::to_string_pretty(&substate)?;
Ok(pretty_json)
Expand Down Expand Up @@ -373,11 +398,14 @@ impl EventScanner {
}
}

fn extract_transaction_ids_from_blocks(&self, blocks: Vec<Block>) -> HashSet<TransactionId> {
fn extract_transactions_from_blocks(&self, blocks: Vec<Block>) -> Vec<TransactionMetadata> {
blocks
.iter()
.flat_map(|b| b.all_accepted_transactions_ids())
.copied()
.flat_map(|b| b.all_accepted_transactions_ids().map(|id| (id, b.timestamp())))
.map(|(transaction_id, timestamp)| TransactionMetadata {
transaction_id: *transaction_id,
timestamp,
})
.collect()
}

Expand Down
2 changes: 2 additions & 0 deletions applications/tari_indexer/src/graphql/model/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl EventQuery {
topic: String,
payload: String,
version: u64,
timestamp: u64,
) -> Result<Event, anyhow::Error> {
info!(
target: LOG_TARGET,
Expand All @@ -185,6 +186,7 @@ impl EventQuery {
topic.clone(),
&payload,
version,
timestamp,
)?;

Ok(Event {
Expand Down
91 changes: 56 additions & 35 deletions applications/tari_indexer/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,41 +44,41 @@ use tari_dan_engine::{template::TemplateModuleLoader, wasm::WasmModule};
use tari_dan_p2p::TariMessagingSpec;
use tari_dan_storage::consensus_models::Decision;
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_indexer_client::{
types,
types::{
AddPeerRequest,
AddPeerResponse,
ConnectionDirection,
GetAllVnsRequest,
GetAllVnsResponse,
GetCommsStatsResponse,
GetConnectionsResponse,
GetEpochManagerStatsResponse,
GetIdentityResponse,
GetNonFungibleCollectionsResponse,
GetNonFungibleCountRequest,
GetNonFungibleCountResponse,
GetNonFungiblesRequest,
GetNonFungiblesResponse,
GetRelatedTransactionsRequest,
GetRelatedTransactionsResponse,
GetSubstateRequest,
GetSubstateResponse,
GetTemplateDefinitionRequest,
GetTemplateDefinitionResponse,
GetTransactionResultRequest,
GetTransactionResultResponse,
IndexerTransactionFinalizedResult,
InspectSubstateRequest,
InspectSubstateResponse,
ListTemplatesRequest,
ListTemplatesResponse,
NonFungibleSubstate,
SubmitTransactionRequest,
SubmitTransactionResponse,
TemplateMetadata,
},
use tari_indexer_client::types::{
self,
AddPeerRequest,
AddPeerResponse,
ConnectionDirection,
GetAllVnsRequest,
GetAllVnsResponse,
GetCommsStatsResponse,
GetConnectionsResponse,
GetEpochManagerStatsResponse,
GetIdentityResponse,
GetNonFungibleCollectionsResponse,
GetNonFungibleCountRequest,
GetNonFungibleCountResponse,
GetNonFungiblesRequest,
GetNonFungiblesResponse,
GetRelatedTransactionsRequest,
GetRelatedTransactionsResponse,
GetSubstateRequest,
GetSubstateResponse,
GetTemplateDefinitionRequest,
GetTemplateDefinitionResponse,
GetTransactionResultRequest,
GetTransactionResultResponse,
IndexerTransactionFinalizedResult,
InspectSubstateRequest,
InspectSubstateResponse,
ListSubstatesRequest,
ListSubstatesResponse,
ListTemplatesRequest,
ListTemplatesResponse,
NonFungibleSubstate,
SubmitTransactionRequest,
SubmitTransactionResponse,
TemplateMetadata,
};
use tari_networking::{is_supported_multiaddr, NetworkingHandle, NetworkingService};
use tari_validator_node_rpc::client::{SubstateResult, TariValidatorNodeRpcClientFactory, TransactionResultStatus};
Expand Down Expand Up @@ -261,6 +261,27 @@ impl JsonRpcHandlers {
}))
}

pub async fn list_substates(&self, value: JsonRpcExtractor) -> JrpcResult {
let answer_id = value.get_answer_id();
let ListSubstatesRequest {
filter_by_template,
filter_by_type,
limit,
offset,
} = value.parse_params()?;

let substates = self
.substate_manager
.list_substates(filter_by_type, filter_by_template, limit, offset)
.await
.map_err(|e| {
warn!(target: LOG_TARGET, "Error getting substate: {}", e);
Self::internal_error(answer_id, format!("Error getting substate: {}", e))
})?;

Ok(JsonRpcResponse::success(answer_id, ListSubstatesResponse { substates }))
}

pub async fn get_substate(&self, value: JsonRpcExtractor) -> JrpcResult {
let answer_id = value.get_answer_id();
let request: GetSubstateRequest = value.parse_params()?;
Expand Down
1 change: 1 addition & 0 deletions applications/tari_indexer/src/json_rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async fn handler(Extension(handlers): Extension<Arc<JsonRpcHandlers>>, value: Js
"get_all_vns" => handlers.get_all_vns(value).await,
"add_peer" => handlers.add_peer(value).await,
"get_comms_stats" => handlers.get_comms_stats(value).await,
"list_substates" => handlers.list_substates(value).await,
"get_substate" => handlers.get_substate(value).await,
"inspect_substate" => handlers.inspect_substate(value).await,
"get_connections" => handlers.get_connections(value).await,
Expand Down
16 changes: 15 additions & 1 deletion applications/tari_indexer/src/substate_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use tari_dan_app_utilities::substate_file_cache::SubstateFileCache;
use tari_dan_common_types::PeerAddress;
use tari_engine_types::substate::{Substate, SubstateId};
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_indexer_client::types::{ListSubstateItem, SubstateType};
use tari_indexer_lib::{substate_scanner::SubstateScanner, NonFungibleSubstate};
use tari_template_lib::models::TemplateAddress;
use tari_transaction::TransactionId;
use tari_validator_node_rpc::client::{SubstateResult, TariValidatorNodeRpcClientFactory};

Expand Down Expand Up @@ -88,12 +90,24 @@ impl SubstateManager {
}
}

pub async fn list_substates(
&self,
filter_by_type: Option<SubstateType>,
filter_by_template: Option<TemplateAddress>,
limit: Option<u64>,
offset: Option<u64>,
) -> Result<Vec<ListSubstateItem>, anyhow::Error> {
let mut tx = self.substate_store.create_read_tx()?;
let substates = tx.list_substates(filter_by_type, filter_by_template, limit, offset)?;
Ok(substates)
}

pub async fn get_substate(
&self,
substate_address: &SubstateId,
version: Option<u32>,
) -> Result<Option<SubstateResponse>, anyhow::Error> {
// we store the latest version of the substates in the watchlist,
// we store the latest version of the substates related to the events
// so we will return the substate directly from database if it's there
if let Some(substate) = self.get_substate_from_db(substate_address, version).await? {
return Ok(Some(substate));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This file should undo anything in `up.sql`
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- The transaction hash column will be similar to the one in the "events" table
alter table substates
drop column transaction_hash;
alter table substates
add column tx_hash text not NULL;

-- Tempalte address for component substates
alter table substates
add column template_address text NULL;

-- Name of the template module for components
alter table substates
add column module_name text NULL;

-- block timestamp for events and substates
alter table events
add column timestamp bigint not NULL;
alter table substates
add column timestamp bigint not NULL;
Loading

0 comments on commit 03a72be

Please sign in to comment.