From 75e4825c890e66fd65d39e751b87f0e4ca00cfcc Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Wed, 22 Nov 2023 22:04:05 -0500 Subject: [PATCH] nft_ingester: use program_transformers crate --- Cargo.lock | 30 +- Cargo.toml | 7 +- nft_ingester/Cargo.toml | 18 +- nft_ingester/src/account_updates.rs | 40 +- nft_ingester/src/error/mod.rs | 50 +- nft_ingester/src/lib.rs | 1 - nft_ingester/src/main.rs | 1 - nft_ingester/src/metrics.rs | 29 +- .../program_transformers/bubblegum/burn.rs | 65 --- .../bubblegum/cancel_redeem.rs | 74 --- .../bubblegum/collection_verification.rs | 80 ---- .../bubblegum/creator_verification.rs | 107 ----- .../src/program_transformers/bubblegum/db.rs | 442 ------------------ .../bubblegum/decompress.rs | 32 -- .../bubblegum/delegate.rs | 72 --- .../src/program_transformers/bubblegum/mod.rs | 108 ----- .../program_transformers/bubblegum/redeem.rs | 59 --- .../bubblegum/transfer.rs | 75 --- nft_ingester/src/program_transformers/mod.rs | 189 -------- .../src/program_transformers/token/mod.rs | 141 ------ .../token_metadata/master_edition.rs | 94 ---- .../token_metadata/mod.rs | 54 --- nft_ingester/src/tasks/common/mod.rs | 54 ++- nft_ingester/src/transaction_notifications.rs | 41 +- program_transformers/src/lib.rs | 8 +- 25 files changed, 119 insertions(+), 1752 deletions(-) delete mode 100644 nft_ingester/src/program_transformers/bubblegum/burn.rs delete mode 100644 nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs delete mode 100644 nft_ingester/src/program_transformers/bubblegum/collection_verification.rs delete mode 100644 nft_ingester/src/program_transformers/bubblegum/creator_verification.rs delete mode 100644 nft_ingester/src/program_transformers/bubblegum/db.rs delete mode 100644 nft_ingester/src/program_transformers/bubblegum/decompress.rs delete mode 100644 nft_ingester/src/program_transformers/bubblegum/delegate.rs delete mode 100644 nft_ingester/src/program_transformers/bubblegum/mod.rs delete mode 100644 nft_ingester/src/program_transformers/bubblegum/redeem.rs delete mode 100644 nft_ingester/src/program_transformers/bubblegum/transfer.rs delete mode 100644 nft_ingester/src/program_transformers/mod.rs delete mode 100644 nft_ingester/src/program_transformers/token/mod.rs delete mode 100644 nft_ingester/src/program_transformers/token_metadata/master_edition.rs delete mode 100644 nft_ingester/src/program_transformers/token_metadata/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 08913f59c..373c3b341 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3420,9 +3420,7 @@ dependencies = [ name = "nft_ingester" version = "0.7.2" dependencies = [ - "anchor-lang", "async-trait", - "base64 0.21.4", "blockbuster", "borsh 0.10.3", "bs58 0.4.0", @@ -3431,46 +3429,32 @@ dependencies = [ "chrono", "clap 4.4.8", "digital_asset_types", - "env_logger 0.10.0", "figment", "flatbuffers", "futures", - "futures-util", - "hex", - "lazy_static", "log", "mpl-bubblegum", - "num-integer", - "num-traits", "plerkle_messenger", "plerkle_serialization", + "program_transformers", "rand 0.8.5", "redis", - "regex", "reqwest", "rust-crypto", "sea-orm", - "sea-query 0.28.5", "serde", "serde_json", "solana-account-decoder", "solana-client", - "solana-geyser-plugin-interface", "solana-sdk", - "solana-sdk-macro", "solana-transaction-status", "spl-account-compression", - "spl-concurrent-merkle-tree", - "spl-token 4.0.0", "sqlx", "stretto", "thiserror", "tokio", - "tokio-postgres", - "tokio-stream", "tracing-subscriber", "url", - "uuid", ] [[package]] @@ -5757,18 +5741,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "solana-geyser-plugin-interface" -version = "1.16.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "512372f741974113777872cecd560beee189cf79c3fb2eb84e1c28be69f011d1" -dependencies = [ - "log", - "solana-sdk", - "solana-transaction-status", - "thiserror", -] - [[package]] name = "solana-logger" version = "1.16.16" diff --git a/Cargo.toml b/Cargo.toml index a5d08f3cb..1c967736e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ anchor-lang = "0.28.0" anyhow = "1.0.75" async-std = "1.0.0" async-trait = "0.1.60" -base64 = "0.21.0" blockbuster = "0.9.0-beta.1" borsh = "~0.10.3" bs58 = "0.4.0" @@ -43,7 +42,6 @@ fake = "2.5.0" figment = "0.10.8" flatbuffers = "23.1.21" futures = "0.3.28" -futures-util = "0.3.27" hex = "0.4.3" hyper = "0.14.23" indexmap = "1.9.3" @@ -60,12 +58,12 @@ mpl-candy-machine-core = "2.0.1" mpl-token-metadata = "=2.0.0-beta.1" nft_ingester = { path = "nft_ingester" } num-derive = "0.3.3" -num-integer = { version = "0.1.44", default_features = false } num-traits = "0.2.15" open-rpc-derive = "0.0.4" open-rpc-schema = "0.0.4" plerkle_messenger = "1.6.0" plerkle_serialization = "1.6.0" +program_transformers = { path = "program_transformers" } prometheus = "0.13.3" proxy-wasm = "0.2.0" rand = "0.8.5" @@ -82,10 +80,8 @@ serde = "1.0.137" serde_json = "1.0.81" solana-account-decoder = "~1.16.16" solana-client = "~1.16.16" -solana-geyser-plugin-interface = "~1.16.16" solana-program = "~1.16.16" solana-sdk = "~1.16.16" -solana-sdk-macro = "~1.16.16" solana-transaction-status = "~1.16.16" spl-account-compression = "0.2.0" spl-associated-token-account = ">= 1.1.3, < 3.0" @@ -104,7 +100,6 @@ tracing = "0.1.35" tracing-subscriber = "0.3.16" txn_forwarder = { path = "tools/txn_forwarder" } url = "2.3.1" -uuid = "1.0.0" wasi = "0.7.0" wasm-bindgen = "0.2.83" diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 05b98b563..f31a589dd 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -6,9 +6,7 @@ repository = { workspace = true } publish = { workspace = true } [dependencies] -anchor-lang = { workspace = true } async-trait = { workspace = true } -base64 = { workspace = true } blockbuster = { workspace = true } borsh = { workspace = true } bs58 = { workspace = true } @@ -17,46 +15,32 @@ cadence-macros = { workspace = true } chrono = { workspace = true } clap = { workspace = true, features = ["derive", "cargo"] } digital_asset_types = { workspace = true, features = ["json_types", "sql_types"] } -env_logger = { workspace = true } figment = { workspace = true, features = ["env", "toml", "yaml"] } flatbuffers = { workspace = true } futures = { workspace = true } -futures-util = { workspace = true } -hex = { workspace = true } -lazy_static = { workspace = true } log = { workspace = true } mpl-bubblegum = { workspace = true } -num-integer = { workspace = true } -num-traits = { workspace = true } plerkle_messenger = { workspace = true, features = ["redis"] } plerkle_serialization = { workspace = true } +program_transformers = { workspace = true } rand = { workspace = true } redis = { workspace = true, features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp"] } -regex = { workspace = true } reqwest = { workspace = true } rust-crypto = { workspace = true } sea-orm = { workspace = true, features = ["macros", "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", "mock"] } -sea-query = { workspace = true, features = ["postgres-array"] } serde = { workspace = true } serde_json = { workspace = true } solana-account-decoder = { workspace = true } solana-client = { workspace = true } -solana-geyser-plugin-interface = { workspace = true } solana-sdk = { workspace = true } -solana-sdk-macro = { workspace = true } solana-transaction-status = { workspace = true } spl-account-compression = { workspace = true, features = ["no-entrypoint"] } -spl-concurrent-merkle-tree = { workspace = true } -spl-token = { workspace = true, features = ["no-entrypoint"] } sqlx = { workspace = true, features = ["macros", "runtime-tokio-rustls", "postgres", "uuid", "offline", "json"] } stretto = { workspace = true, features = ["async"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["tracing"] } -tokio-postgres = { workspace = true } -tokio-stream = { workspace = true } tracing-subscriber = { workspace = true, features = ["json", "env-filter", "ansi"] } url = { workspace = true } -uuid = { workspace = true } [lints] workspace = true diff --git a/nft_ingester/src/account_updates.rs b/nft_ingester/src/account_updates.rs index 68b6c83a0..354aaf9c6 100644 --- a/nft_ingester/src/account_updates.rs +++ b/nft_ingester/src/account_updates.rs @@ -1,18 +1,22 @@ -use std::sync::Arc; - -use crate::{ - metric, metrics::capture_result, program_transformers::ProgramTransformer, tasks::TaskData, -}; -use cadence_macros::{is_global_default_set, statsd_count, statsd_time}; -use chrono::Utc; -use log::{debug, error}; -use plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData}; -use plerkle_serialization::root_as_account_info; -use sqlx::{Pool, Postgres}; -use tokio::{ - sync::mpsc::UnboundedSender, - task::{JoinHandle, JoinSet}, - time::Instant, +use { + crate::{ + metric, + metrics::capture_result, + tasks::{create_download_metadata_notifier, TaskData}, + }, + cadence_macros::{is_global_default_set, statsd_count, statsd_time}, + chrono::Utc, + log::{debug, error}, + plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData}, + plerkle_serialization::root_as_account_info, + program_transformers::ProgramTransformer, + sqlx::{Pool, Postgres}, + std::sync::Arc, + tokio::{ + sync::mpsc::UnboundedSender, + task::{JoinHandle, JoinSet}, + time::Instant, + }, }; pub fn account_worker( @@ -26,7 +30,11 @@ pub fn account_worker( tokio::spawn(async move { let source = T::new(config).await; if let Ok(mut msg) = source { - let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, false)); + let manager = Arc::new(ProgramTransformer::new( + pool, + create_download_metadata_notifier(bg_task_sender), + false, + )); loop { let e = msg.recv(stream_key, consumption_type.clone()).await; let mut tasks = JoinSet::new(); diff --git a/nft_ingester/src/error/mod.rs b/nft_ingester/src/error/mod.rs index 37ed5f24b..ddeae0ba6 100644 --- a/nft_ingester/src/error/mod.rs +++ b/nft_ingester/src/error/mod.rs @@ -1,27 +1,15 @@ -use crate::tasks::TaskData; -use blockbuster::error::BlockbusterError; -use plerkle_messenger::MessengerError; -use plerkle_serialization::error::PlerkleSerializationError; -use sea_orm::{DbErr, TransactionError}; -use thiserror::Error; -use tokio::sync::mpsc::error::SendError; +use { + crate::tasks::TaskData, plerkle_messenger::MessengerError, + plerkle_serialization::error::PlerkleSerializationError, sea_orm::DbErr, + tokio::sync::mpsc::error::SendError, +}; -#[derive(Error, Debug, PartialEq, Eq)] +#[derive(Debug, thiserror::Error)] pub enum IngesterError { - #[error("ChangeLog Event Malformed")] - ChangeLogEventMalformed, - #[error("Compressed Asset Event Malformed")] - CompressedAssetEventMalformed, #[error("Network Error: {0}")] BatchInitNetworkingError(String), - #[error("Error writing batch files")] - BatchInitIOError, - #[error("Storage listener error: ({msg})")] - StorageListenerError { msg: String }, #[error("Storage Write Error: {0}")] StorageWriteError(String), - #[error("NotImplemented")] - NotImplemented, #[error("Deserialization Error: {0}")] DeserializationError(String), #[error("Task Manager Error: {0}")] @@ -36,12 +24,6 @@ pub enum IngesterError { SerializatonError(String), #[error("Messenger error; {0}")] MessengerError(String), - #[error("Blockbuster Parsing error: {0}")] - ParsingError(String), - #[error("Database Error: {0}")] - DatabaseError(String), - #[error("Unknown Task Type: {0}")] - UnknownTaskType(String), #[error("BG Task Manager Not Started")] TaskManagerNotStarted, #[error("Unrecoverable task error: {0}")] @@ -50,8 +32,6 @@ pub enum IngesterError { CacheStorageWriteError(String), #[error("HttpError {status_code}")] HttpError { status_code: String }, - #[error("AssetIndex Error {0}")] - AssetIndexError(String), } impl From for IngesterError { @@ -72,30 +52,12 @@ impl From for IngesterError { } } -impl From for IngesterError { - fn from(err: BlockbusterError) -> Self { - IngesterError::ParsingError(err.to_string()) - } -} - -impl From for IngesterError { - fn from(_err: std::io::Error) -> Self { - IngesterError::BatchInitIOError - } -} - impl From for IngesterError { fn from(e: DbErr) -> Self { IngesterError::StorageWriteError(e.to_string()) } } -impl From> for IngesterError { - fn from(e: TransactionError) -> Self { - IngesterError::StorageWriteError(e.to_string()) - } -} - impl From> for IngesterError { fn from(err: SendError) -> Self { IngesterError::TaskManagerError(format!("Could not create task: {:?}", err.to_string())) diff --git a/nft_ingester/src/lib.rs b/nft_ingester/src/lib.rs index 48e29aa70..9aee9c6ba 100644 --- a/nft_ingester/src/lib.rs +++ b/nft_ingester/src/lib.rs @@ -5,7 +5,6 @@ pub mod config; pub mod database; pub mod error; pub mod metrics; -pub mod program_transformers; pub mod stream; pub mod tasks; pub mod transaction_notifications; diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 6ae64a15e..1c13320b5 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -5,7 +5,6 @@ pub mod config; mod database; pub mod error; pub mod metrics; -mod program_transformers; mod stream; pub mod tasks; mod transaction_notifications; diff --git a/nft_ingester/src/metrics.rs b/nft_ingester/src/metrics.rs index 2e4ebb0d8..207c6318f 100644 --- a/nft_ingester/src/metrics.rs +++ b/nft_ingester/src/metrics.rs @@ -1,11 +1,12 @@ -use std::net::UdpSocket; - -use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient}; -use cadence_macros::{is_global_default_set, set_global_default, statsd_count, statsd_time}; -use log::{error, warn}; -use tokio::time::Instant; - -use crate::{config::IngesterConfig, error::IngesterError}; +use { + crate::config::IngesterConfig, + cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient}, + cadence_macros::{is_global_default_set, set_global_default, statsd_count, statsd_time}, + log::{error, warn}, + program_transformers::error::ProgramTransformerError, + std::net::UdpSocket, + tokio::time::Instant, +}; #[macro_export] macro_rules! metric { @@ -42,7 +43,7 @@ pub fn capture_result( stream: &str, label: (&str, &str), tries: usize, - res: Result<(), IngesterError>, + res: Result<(), ProgramTransformerError>, proc: Instant, txn_sig: Option<&str>, account: Option, @@ -63,13 +64,13 @@ pub fn capture_result( } true } - Err(IngesterError::NotImplemented) => { + Err(ProgramTransformerError::NotImplemented) => { metric! { statsd_count!("ingester.not_implemented", 1, label.0 => label.1, "stream" => stream, "error" => "ni"); } true } - Err(IngesterError::DeserializationError(e)) => { + Err(ProgramTransformerError::DeserializationError(e)) => { metric! { statsd_count!("ingester.ingest_error", 1, label.0 => label.1, "stream" => stream, "error" => "de"); } @@ -83,7 +84,7 @@ pub fn capture_result( // Non-retryable error. true } - Err(IngesterError::ParsingError(e)) => { + Err(ProgramTransformerError::ParsingError(e)) => { metric! { statsd_count!("ingester.ingest_error", 1, label.0 => label.1, "stream" => stream, "error" => "parse"); } @@ -97,7 +98,7 @@ pub fn capture_result( // Non-retryable error. true } - Err(IngesterError::DatabaseError(e)) => { + Err(ProgramTransformerError::DatabaseError(e)) => { metric! { statsd_count!("ingester.database_error", 1, label.0 => label.1, "stream" => stream, "error" => "db"); } @@ -108,7 +109,7 @@ pub fn capture_result( } false } - Err(IngesterError::AssetIndexError(e)) => { + Err(ProgramTransformerError::AssetIndexError(e)) => { metric! { statsd_count!("ingester.index_error", 1, label.0 => label.1, "stream" => stream, "error" => "index"); } diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs deleted file mode 100644 index 70ddcfcea..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/burn.rs +++ /dev/null @@ -1,65 +0,0 @@ -use crate::{ - error::IngesterError, - program_transformers::bubblegum::{ - save_changelog_event, u32_to_u8_array, upsert_asset_with_seq, - }, -}; -use anchor_lang::prelude::Pubkey; -use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; -use digital_asset_types::dao::asset; -use log::debug; -use sea_orm::{ - entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait, - TransactionTrait, -}; - -pub async fn burn<'c, T>( - parsing_result: &BubblegumInstruction, - bundle: &InstructionBundle<'c>, - txn: &'c T, - cl_audits: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - let leaf_index = cl.index; - let (asset_id, _) = Pubkey::find_program_address( - &[ - "asset".as_bytes(), - cl.id.as_ref(), - u32_to_u8_array(leaf_index).as_ref(), - ], - &mpl_bubblegum::ID, - ); - debug!("Indexing burn for asset id: {:?}", asset_id); - let id_bytes = asset_id.to_bytes(); - - let asset_model = asset::ActiveModel { - id: Set(id_bytes.to_vec()), - burnt: Set(true), - ..Default::default() - }; - - // Upsert asset table `burnt` column. - let query = asset::Entity::insert(asset_model) - .on_conflict( - OnConflict::columns([asset::Column::Id]) - .update_columns([ - asset::Column::Burnt, - //TODO maybe handle slot updated. - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query).await?; - - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; - - return Ok(()); - } - Err(IngesterError::ParsingError( - "Ix not parsed correctly".to_string(), - )) -} diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs deleted file mode 100644 index 1b8f5842a..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::{ - error::IngesterError, - program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, - }, -}; -use blockbuster::{ - instruction::InstructionBundle, - programs::bubblegum::{BubblegumInstruction, LeafSchema}, -}; -use sea_orm::{ConnectionTrait, TransactionTrait}; - -pub async fn cancel_redeem<'c, T>( - parsing_result: &BubblegumInstruction, - bundle: &InstructionBundle<'c>, - txn: &'c T, - cl_audits: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - #[allow(unreachable_patterns)] - return match le.schema { - LeafSchema::V1 { - id, - owner, - delegate, - .. - } => { - let id_bytes = id.to_bytes(); - let owner_bytes = owner.to_bytes().to_vec(); - let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { - None - } else { - Some(delegate.to_bytes().to_vec()) - }; - let tree_id = cl.id.to_bytes(); - let nonce = cl.index as i64; - - // Partial update of asset table with just leaf. - upsert_asset_with_leaf_info( - txn, - id_bytes.to_vec(), - nonce, - tree_id.to_vec(), - le.leaf_hash.to_vec(), - le.schema.data_hash(), - le.schema.creator_hash(), - seq as i64, - false, - ) - .await?; - - // Partial update of asset table with just leaf owner and delegate. - upsert_asset_with_owner_and_delegate_info( - txn, - id_bytes.to_vec(), - owner_bytes, - delegate, - seq as i64, - ) - .await?; - - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await - } - }; - } - Err(IngesterError::ParsingError( - "Ix not parsed correctly".to_string(), - )) -} diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs deleted file mode 100644 index 2db92cd60..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ /dev/null @@ -1,80 +0,0 @@ -use crate::program_transformers::bubblegum::{upsert_asset_with_seq, upsert_collection_info}; -use blockbuster::{ - instruction::InstructionBundle, - programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, -}; -use log::debug; -use mpl_bubblegum::types::Collection; -use sea_orm::query::*; - -use super::{save_changelog_event, upsert_asset_with_leaf_info}; -use crate::error::IngesterError; -pub async fn process<'c, T>( - parsing_result: &BubblegumInstruction, - bundle: &InstructionBundle<'c>, - txn: &'c T, - cl_audits: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - if let (Some(le), Some(cl), Some(payload)) = ( - &parsing_result.leaf_update, - &parsing_result.tree_update, - &parsing_result.payload, - ) { - let (collection, verify) = match payload { - Payload::CollectionVerification { - collection, verify, .. - } => (*collection, verify), - _ => { - return Err(IngesterError::ParsingError( - "Ix not parsed correctly".to_string(), - )); - } - }; - debug!( - "Handling collection verification event for {} (verify: {}): {}", - collection, verify, bundle.txn_id - ); - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - let id_bytes = match le.schema { - LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), - }; - let tree_id = cl.id.to_bytes(); - let nonce = cl.index as i64; - - // Partial update of asset table with just leaf. - upsert_asset_with_leaf_info( - txn, - id_bytes.to_vec(), - nonce, - tree_id.to_vec(), - le.leaf_hash.to_vec(), - le.schema.data_hash(), - le.schema.creator_hash(), - seq as i64, - false, - ) - .await?; - - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; - - upsert_collection_info( - txn, - id_bytes.to_vec(), - Some(Collection { - key: collection, - verified: *verify, - }), - bundle.slot as i64, - seq as i64, - ) - .await?; - - return Ok(()); - }; - Err(IngesterError::ParsingError( - "Ix not parsed correctly".to_string(), - )) -} diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs deleted file mode 100644 index 134fe89ca..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ /dev/null @@ -1,107 +0,0 @@ -use crate::{ - error::IngesterError, - program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_creator_verified, - }, -}; -use blockbuster::{ - instruction::InstructionBundle, - programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, -}; -use log::debug; -use sea_orm::{ConnectionTrait, TransactionTrait}; - -pub async fn process<'c, T>( - parsing_result: &BubblegumInstruction, - bundle: &InstructionBundle<'c>, - txn: &'c T, - value: bool, - cl_audits: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - if let (Some(le), Some(cl), Some(payload)) = ( - &parsing_result.leaf_update, - &parsing_result.tree_update, - &parsing_result.payload, - ) { - let (creator, verify) = match payload { - Payload::CreatorVerification { - creator, verify, .. - } => (creator, verify), - _ => { - return Err(IngesterError::ParsingError( - "Ix not parsed correctly".to_string(), - )); - } - }; - debug!( - "Handling creator verification event for creator {} (verify: {}): {}", - creator, verify, bundle.txn_id - ); - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - - let asset_id_bytes = match le.schema { - LeafSchema::V1 { - id, - owner, - delegate, - .. - } => { - let id_bytes = id.to_bytes(); - let owner_bytes = owner.to_bytes().to_vec(); - let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { - None - } else { - Some(delegate.to_bytes().to_vec()) - }; - let tree_id = cl.id.to_bytes(); - let nonce = cl.index as i64; - - // Partial update of asset table with just leaf info. - upsert_asset_with_leaf_info( - txn, - id_bytes.to_vec(), - nonce, - tree_id.to_vec(), - le.leaf_hash.to_vec(), - le.schema.data_hash(), - le.schema.creator_hash(), - seq as i64, - false, - ) - .await?; - - // Partial update of asset table with just leaf owner and delegate. - upsert_asset_with_owner_and_delegate_info( - txn, - id_bytes.to_vec(), - owner_bytes, - delegate, - seq as i64, - ) - .await?; - - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; - - id_bytes.to_vec() - } - }; - - upsert_creator_verified( - txn, - asset_id_bytes, - creator.to_bytes().to_vec(), - value, - seq as i64, - ) - .await?; - - return Ok(()); - } - Err(IngesterError::ParsingError( - "Ix not parsed correctly".to_string(), - )) -} diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs deleted file mode 100644 index b7141e231..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ /dev/null @@ -1,442 +0,0 @@ -use crate::error::IngesterError; -use digital_asset_types::dao::{ - asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, -}; -use log::{debug, info}; -use mpl_bubblegum::types::Collection; -use sea_orm::{ - query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, -}; -use spl_account_compression::events::ChangeLogEventV1; - -pub async fn save_changelog_event<'c, T>( - change_log_event: &ChangeLogEventV1, - slot: u64, - txn_id: &str, - txn: &T, - cl_audits: bool, -) -> Result -where - T: ConnectionTrait + TransactionTrait, -{ - insert_change_log(change_log_event, slot, txn_id, txn, cl_audits).await?; - Ok(change_log_event.seq) -} - -const fn node_idx_to_leaf_idx(index: i64, tree_height: u32) -> i64 { - index - 2i64.pow(tree_height) -} - -pub async fn insert_change_log<'c, T>( - change_log_event: &ChangeLogEventV1, - slot: u64, - txn_id: &str, - txn: &T, - cl_audits: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let mut i: i64 = 0; - let depth = change_log_event.path.len() - 1; - let tree_id = change_log_event.id.as_ref(); - for p in change_log_event.path.iter() { - let node_idx = p.index as i64; - debug!( - "seq {}, index {} level {}, node {:?}, txn: {:?}", - change_log_event.seq, - p.index, - i, - bs58::encode(p.node).into_string(), - txn_id, - ); - let leaf_idx = if i == 0 { - Some(node_idx_to_leaf_idx(node_idx, depth as u32)) - } else { - None - }; - - let item = cl_items::ActiveModel { - tree: Set(tree_id.to_vec()), - level: Set(i), - node_idx: Set(node_idx), - hash: Set(p.node.as_ref().to_vec()), - seq: Set(change_log_event.seq as i64), - leaf_idx: Set(leaf_idx), - ..Default::default() - }; - - let audit_item: Option = if cl_audits { - let mut ai: cl_audits::ActiveModel = item.clone().into(); - ai.tx = Set(txn_id.to_string()); - Some(ai) - } else { - None - }; - - i += 1; - let mut query = cl_items::Entity::insert(item) - .on_conflict( - OnConflict::columns([cl_items::Column::Tree, cl_items::Column::NodeIdx]) - .update_columns([ - cl_items::Column::Hash, - cl_items::Column::Seq, - cl_items::Column::LeafIdx, - cl_items::Column::Level, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!("{} WHERE excluded.seq > cl_items.seq", query.sql); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; - - // Insert the audit item after the insert into cl_items have been completed - if let Some(audit_item) = audit_item { - cl_audits::Entity::insert(audit_item).exec(txn).await?; - } - } - - // If and only if the entire path of nodes was inserted into the `cl_items` table, then insert - // a single row into the `backfill_items` table. This way if an incomplete path was inserted - // into `cl_items` due to an error, a gap will be created for the tree and the backfiller will - // fix it. - if i - 1 == depth as i64 { - // See if the tree already exists in the `backfill_items` table. - let rows = backfill_items::Entity::find() - .filter(backfill_items::Column::Tree.eq(tree_id)) - .limit(1) - .all(txn) - .await?; - - // If the tree does not exist in `backfill_items` and the sequence number is greater than 1, - // then we know we will need to backfill the tree from sequence number 1 up to the current - // sequence number. So in this case we set at flag to force checking the tree. - let force_chk = rows.is_empty() && change_log_event.seq > 1; - - info!("Adding to backfill_items table at level {}", i - 1); - let item = backfill_items::ActiveModel { - tree: Set(tree_id.to_vec()), - seq: Set(change_log_event.seq as i64), - slot: Set(slot as i64), - force_chk: Set(force_chk), - backfilled: Set(false), - failed: Set(false), - ..Default::default() - }; - - backfill_items::Entity::insert(item).exec(txn).await?; - } - - Ok(()) - //TODO -> set maximum size of path and break into multiple statements -} - -#[allow(clippy::too_many_arguments)] -pub async fn upsert_asset_with_leaf_info( - txn: &T, - id: Vec, - nonce: i64, - tree_id: Vec, - leaf: Vec, - data_hash: [u8; 32], - creator_hash: [u8; 32], - seq: i64, - was_decompressed: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let data_hash = bs58::encode(data_hash).into_string().trim().to_string(); - let creator_hash = bs58::encode(creator_hash).into_string().trim().to_string(); - let model = asset::ActiveModel { - id: Set(id), - nonce: Set(Some(nonce)), - tree_id: Set(Some(tree_id)), - leaf: Set(Some(leaf)), - data_hash: Set(Some(data_hash)), - creator_hash: Set(Some(creator_hash)), - leaf_seq: Set(Some(seq)), - ..Default::default() - }; - - let mut query = asset::Entity::insert(model) - .on_conflict( - OnConflict::column(asset::Column::Id) - .update_columns([ - asset::Column::Nonce, - asset::Column::TreeId, - asset::Column::Leaf, - asset::Column::LeafSeq, - asset::Column::DataHash, - asset::Column::CreatorHash, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - - // If we are indexing decompression we will update the leaf regardless of if we have previously - // indexed decompression and regardless of seq. - if !was_decompressed { - query.sql = format!( - "{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)", - query.sql - ); - } - - txn.execute(query) - .await - .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; - - Ok(()) -} - -pub async fn upsert_asset_with_leaf_info_for_decompression( - txn: &T, - id: Vec, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let model = asset::ActiveModel { - id: Set(id), - leaf: Set(None), - nonce: Set(Some(0)), - leaf_seq: Set(None), - data_hash: Set(None), - creator_hash: Set(None), - tree_id: Set(None), - seq: Set(Some(0)), - ..Default::default() - }; - let query = asset::Entity::insert(model) - .on_conflict( - OnConflict::column(asset::Column::Id) - .update_columns([ - asset::Column::Leaf, - asset::Column::LeafSeq, - asset::Column::Nonce, - asset::Column::DataHash, - asset::Column::CreatorHash, - asset::Column::TreeId, - asset::Column::Seq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; - - Ok(()) -} - -pub async fn upsert_asset_with_owner_and_delegate_info( - txn: &T, - id: Vec, - owner: Vec, - delegate: Option>, - seq: i64, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let model = asset::ActiveModel { - id: Set(id), - owner: Set(Some(owner)), - delegate: Set(delegate), - owner_delegate_seq: Set(Some(seq)), // gummyroll seq - ..Default::default() - }; - - let mut query = asset::Entity::insert(model) - .on_conflict( - OnConflict::column(asset::Column::Id) - .update_columns([ - asset::Column::Owner, - asset::Column::Delegate, - asset::Column::OwnerDelegateSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.owner_delegate_seq >= asset.owner_delegate_seq OR asset.owner_delegate_seq IS NULL", - query.sql - ); - - txn.execute(query) - .await - .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; - - Ok(()) -} - -pub async fn upsert_asset_with_compression_info( - txn: &T, - id: Vec, - compressed: bool, - compressible: bool, - supply: i64, - supply_mint: Option>, - was_decompressed: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let model = asset::ActiveModel { - id: Set(id), - compressed: Set(compressed), - compressible: Set(compressible), - supply: Set(supply), - supply_mint: Set(supply_mint), - was_decompressed: Set(was_decompressed), - ..Default::default() - }; - - let mut query = asset::Entity::insert(model) - .on_conflict( - OnConflict::columns([asset::Column::Id]) - .update_columns([ - asset::Column::Compressed, - asset::Column::Compressible, - asset::Column::Supply, - asset::Column::SupplyMint, - asset::Column::WasDecompressed, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!("{} WHERE NOT asset.was_decompressed", query.sql); - txn.execute(query).await?; - - Ok(()) -} - -pub async fn upsert_asset_with_seq(txn: &T, id: Vec, seq: i64) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let model = asset::ActiveModel { - id: Set(id), - seq: Set(Some(seq)), - ..Default::default() - }; - - let mut query = asset::Entity::insert(model) - .on_conflict( - OnConflict::column(asset::Column::Id) - .update_columns([asset::Column::Seq]) - .to_owned(), - ) - .build(DbBackend::Postgres); - - query.sql = format!( - "{} WHERE (NOT asset.was_decompressed) AND (excluded.seq >= asset.seq OR asset.seq IS NULL)", - query.sql - ); - - txn.execute(query) - .await - .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; - - Ok(()) -} - -pub async fn upsert_creator_verified( - txn: &T, - asset_id: Vec, - creator: Vec, - verified: bool, - seq: i64, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let model = asset_creators::ActiveModel { - asset_id: Set(asset_id), - creator: Set(creator), - verified: Set(verified), - seq: Set(Some(seq)), - ..Default::default() - }; - - let mut query = asset_creators::Entity::insert(model) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Verified, - asset_creators::Column::Seq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - - query.sql = format!( - "{} WHERE excluded.seq >= asset_creators.seq OR asset_creators.seq is NULL", - query.sql - ); - - txn.execute(query) - .await - .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; - - Ok(()) -} - -pub async fn upsert_collection_info( - txn: &T, - asset_id: Vec, - collection: Option, - slot_updated: i64, - seq: i64, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let (group_value, verified) = match collection { - Some(c) => (Some(c.key.to_string()), c.verified), - None => (None, false), - }; - - let model = asset_grouping::ActiveModel { - asset_id: Set(asset_id), - group_key: Set("collection".to_string()), - group_value: Set(group_value), - verified: Set(Some(verified)), - slot_updated: Set(Some(slot_updated)), - group_info_seq: Set(Some(seq)), - ..Default::default() - }; - - let mut query = asset_grouping::Entity::insert(model) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .update_columns([ - asset_grouping::Column::GroupValue, - asset_grouping::Column::Verified, - asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - - query.sql = format!( - "{} WHERE excluded.group_info_seq >= asset_grouping.group_info_seq OR asset_grouping.group_info_seq IS NULL", - query.sql - ); - - txn.execute(query) - .await - .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; - - Ok(()) -} diff --git a/nft_ingester/src/program_transformers/bubblegum/decompress.rs b/nft_ingester/src/program_transformers/bubblegum/decompress.rs deleted file mode 100644 index a024d5ebe..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/decompress.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::{ - error::IngesterError, - program_transformers::bubblegum::upsert_asset_with_leaf_info_for_decompression, -}; -use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; -use sea_orm::{query::*, ConnectionTrait}; - -use super::upsert_asset_with_compression_info; - -pub async fn decompress<'c, T>( - _parsing_result: &BubblegumInstruction, - bundle: &InstructionBundle<'c>, - txn: &'c T, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let id_bytes = bundle.keys.get(3).unwrap().0.as_slice(); - - // Partial update of asset table with just leaf. - upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.to_vec()).await?; - upsert_asset_with_compression_info( - txn, - id_bytes.to_vec(), - false, - false, - 1, - Some(id_bytes.to_vec()), - true, - ) - .await -} diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs deleted file mode 100644 index 88896de64..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs +++ /dev/null @@ -1,72 +0,0 @@ -use crate::{ - error::IngesterError, - program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, - }, -}; -use blockbuster::{ - instruction::InstructionBundle, - programs::bubblegum::{BubblegumInstruction, LeafSchema}, -}; -use sea_orm::{ConnectionTrait, TransactionTrait}; - -pub async fn delegate<'c, T>( - parsing_result: &BubblegumInstruction, - bundle: &InstructionBundle<'c>, - txn: &'c T, - cl_audits: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - return match le.schema { - LeafSchema::V1 { - id, - owner, - delegate, - .. - } => { - let id_bytes = id.to_bytes(); - let owner_bytes = owner.to_bytes().to_vec(); - let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { - None - } else { - Some(delegate.to_bytes().to_vec()) - }; - let tree_id = cl.id.to_bytes(); - - // Partial update of asset table with just leaf. - upsert_asset_with_leaf_info( - txn, - id_bytes.to_vec(), - cl.index as i64, - tree_id.to_vec(), - le.leaf_hash.to_vec(), - le.schema.data_hash(), - le.schema.creator_hash(), - seq as i64, - false, - ) - .await?; - - // Partial update of asset table with just leaf owner and delegate. - upsert_asset_with_owner_and_delegate_info( - txn, - id_bytes.to_vec(), - owner_bytes, - delegate, - seq as i64, - ) - .await?; - - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await - } - }; - } - Err(IngesterError::ParsingError( - "Ix not parsed correctly".to_string(), - )) -} diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs deleted file mode 100644 index bcc102c0b..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ /dev/null @@ -1,108 +0,0 @@ -use blockbuster::{ - self, - instruction::InstructionBundle, - programs::bubblegum::{BubblegumInstruction, InstructionName}, -}; -use log::{debug, info}; -use sea_orm::{ConnectionTrait, TransactionTrait}; -use tokio::sync::mpsc::UnboundedSender; - -mod burn; -mod cancel_redeem; -mod collection_verification; -mod creator_verification; -mod db; -mod decompress; -mod delegate; -mod mint_v1; -mod redeem; -mod transfer; - -pub use db::*; - -use crate::{error::IngesterError, tasks::TaskData}; - -pub async fn handle_bubblegum_instruction<'c, T>( - parsing_result: &'c BubblegumInstruction, - bundle: &'c InstructionBundle<'c>, - txn: &T, - task_manager: &UnboundedSender, - cl_audits: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let ix_type = &parsing_result.instruction; - - // @TODO this would be much better served by implemneting Debug trait on the InstructionName - // or wrapping it into something that can display it more neatly. - let ix_str = match ix_type { - InstructionName::Unknown => "Unknown", - InstructionName::MintV1 => "MintV1", - InstructionName::MintToCollectionV1 => "MintToCollectionV1", - InstructionName::Redeem => "Redeem", - InstructionName::CancelRedeem => "CancelRedeem", - InstructionName::Transfer => "Transfer", - InstructionName::Delegate => "Delegate", - InstructionName::DecompressV1 => "DecompressV1", - InstructionName::Compress => "Compress", - InstructionName::Burn => "Burn", - InstructionName::CreateTree => "CreateTree", - InstructionName::VerifyCreator => "VerifyCreator", - InstructionName::UnverifyCreator => "UnverifyCreator", - InstructionName::VerifyCollection => "VerifyCollection", - InstructionName::UnverifyCollection => "UnverifyCollection", - InstructionName::SetAndVerifyCollection => "SetAndVerifyCollection", - InstructionName::SetDecompressibleState | InstructionName::UpdateMetadata => todo!(), - }; - info!("BGUM instruction txn={:?}: {:?}", ix_str, bundle.txn_id); - - match ix_type { - InstructionName::Transfer => { - transfer::transfer(parsing_result, bundle, txn, cl_audits).await?; - } - InstructionName::Burn => { - burn::burn(parsing_result, bundle, txn, cl_audits).await?; - } - InstructionName::Delegate => { - delegate::delegate(parsing_result, bundle, txn, cl_audits).await?; - } - InstructionName::MintV1 | InstructionName::MintToCollectionV1 => { - let task = mint_v1::mint_v1(parsing_result, bundle, txn, cl_audits).await?; - - if let Some(t) = task { - task_manager.send(t)?; - } - } - InstructionName::Redeem => { - redeem::redeem(parsing_result, bundle, txn, cl_audits).await?; - } - InstructionName::CancelRedeem => { - cancel_redeem::cancel_redeem(parsing_result, bundle, txn, cl_audits).await?; - } - InstructionName::DecompressV1 => { - decompress::decompress(parsing_result, bundle, txn).await?; - } - InstructionName::VerifyCreator => { - creator_verification::process(parsing_result, bundle, txn, true, cl_audits).await?; - } - InstructionName::UnverifyCreator => { - creator_verification::process(parsing_result, bundle, txn, false, cl_audits).await?; - } - InstructionName::VerifyCollection - | InstructionName::UnverifyCollection - | InstructionName::SetAndVerifyCollection => { - collection_verification::process(parsing_result, bundle, txn, cl_audits).await?; - } - _ => debug!("Bubblegum: Not Implemented Instruction"), - } - Ok(()) -} - -// PDA lookup requires an 8-byte array. -fn u32_to_u8_array(value: u32) -> [u8; 8] { - let bytes: [u8; 4] = value.to_le_bytes(); - let mut result: [u8; 8] = [0; 8]; - result[..4].copy_from_slice(&bytes); - result -} diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs deleted file mode 100644 index b9b7f2c27..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs +++ /dev/null @@ -1,59 +0,0 @@ -use anchor_lang::prelude::Pubkey; -use log::debug; - -use crate::{ - error::IngesterError, - program_transformers::bubblegum::{ - save_changelog_event, u32_to_u8_array, upsert_asset_with_leaf_info, upsert_asset_with_seq, - }, -}; -use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; -use sea_orm::{ConnectionTrait, TransactionTrait}; - -pub async fn redeem<'c, T>( - parsing_result: &BubblegumInstruction, - bundle: &InstructionBundle<'c>, - txn: &'c T, - cl_audits: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - let leaf_index = cl.index; - let (asset_id, _) = Pubkey::find_program_address( - &[ - "asset".as_bytes(), - cl.id.as_ref(), - u32_to_u8_array(leaf_index).as_ref(), - ], - &mpl_bubblegum::ID, - ); - debug!("Indexing redeem for asset id: {:?}", asset_id); - let id_bytes = asset_id.to_bytes(); - let tree_id = cl.id.to_bytes(); - let nonce = cl.index as i64; - - // Partial update of asset table with just leaf. - upsert_asset_with_leaf_info( - txn, - id_bytes.to_vec(), - nonce, - tree_id.to_vec(), - vec![0; 32], - [0; 32], - [0; 32], - seq as i64, - false, - ) - .await?; - - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; - - return Ok(()); - } - Err(IngesterError::ParsingError( - "Ix not parsed correctly".to_string(), - )) -} diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs deleted file mode 100644 index 573f33a8f..000000000 --- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs +++ /dev/null @@ -1,75 +0,0 @@ -use super::save_changelog_event; -use crate::{ - error::IngesterError, - program_transformers::bubblegum::{ - upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, - upsert_asset_with_seq, - }, -}; -use blockbuster::{ - instruction::InstructionBundle, - programs::bubblegum::{BubblegumInstruction, LeafSchema}, -}; -use sea_orm::{ConnectionTrait, TransactionTrait}; - -pub async fn transfer<'c, T>( - parsing_result: &BubblegumInstruction, - bundle: &InstructionBundle<'c>, - txn: &'c T, - cl_audits: bool, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - #[allow(unreachable_patterns)] - return match le.schema { - LeafSchema::V1 { - id, - owner, - delegate, - .. - } => { - let id_bytes = id.to_bytes(); - let owner_bytes = owner.to_bytes().to_vec(); - let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { - None - } else { - Some(delegate.to_bytes().to_vec()) - }; - let tree_id = cl.id.to_bytes(); - let nonce = cl.index as i64; - - // Partial update of asset table with just leaf. - upsert_asset_with_leaf_info( - txn, - id_bytes.to_vec(), - nonce, - tree_id.to_vec(), - le.leaf_hash.to_vec(), - le.schema.data_hash(), - le.schema.creator_hash(), - seq as i64, - false, - ) - .await?; - - // Partial update of asset table with just leaf owner and delegate. - upsert_asset_with_owner_and_delegate_info( - txn, - id_bytes.to_vec(), - owner_bytes, - delegate, - seq as i64, - ) - .await?; - - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await - } - }; - } - Err(IngesterError::ParsingError( - "Ix not parsed correctly".to_string(), - )) -} diff --git a/nft_ingester/src/program_transformers/mod.rs b/nft_ingester/src/program_transformers/mod.rs deleted file mode 100644 index fc555829d..000000000 --- a/nft_ingester/src/program_transformers/mod.rs +++ /dev/null @@ -1,189 +0,0 @@ -use crate::{error::IngesterError, tasks::TaskData}; -use blockbuster::{ - instruction::{order_instructions, InstructionBundle, IxPair}, - program_handler::ProgramParser, - programs::{ - bubblegum::BubblegumParser, token_account::TokenAccountParser, - token_metadata::TokenMetadataParser, ProgramParseResult, - }, -}; -use log::{debug, error, info}; -use plerkle_serialization::{AccountInfo, Pubkey as FBPubkey, TransactionInfo}; -use sea_orm::{DatabaseConnection, SqlxPostgresConnector}; -use solana_sdk::pubkey::Pubkey; -use sqlx::PgPool; -use std::collections::{HashMap, HashSet, VecDeque}; -use tokio::sync::mpsc::UnboundedSender; - -use crate::program_transformers::{ - bubblegum::handle_bubblegum_instruction, token::handle_token_program_account, - token_metadata::handle_token_metadata_account, -}; - -mod bubblegum; -mod token; -mod token_metadata; - -pub struct ProgramTransformer { - storage: DatabaseConnection, - task_sender: UnboundedSender, - matchers: HashMap>, - key_set: HashSet, - cl_audits: bool, -} - -impl ProgramTransformer { - pub fn new(pool: PgPool, task_sender: UnboundedSender, cl_audits: bool) -> Self { - let mut matchers: HashMap> = HashMap::with_capacity(1); - let bgum = BubblegumParser {}; - let token_metadata = TokenMetadataParser {}; - let token = TokenAccountParser {}; - matchers.insert(bgum.key(), Box::new(bgum)); - matchers.insert(token_metadata.key(), Box::new(token_metadata)); - matchers.insert(token.key(), Box::new(token)); - let hs = matchers.iter().fold(HashSet::new(), |mut acc, (k, _)| { - acc.insert(*k); - acc - }); - let pool: PgPool = pool; - ProgramTransformer { - storage: SqlxPostgresConnector::from_sqlx_postgres_pool(pool), - task_sender, - matchers, - key_set: hs, - cl_audits, - } - } - - pub fn break_transaction<'i>( - &self, - tx: &'i TransactionInfo<'i>, - ) -> VecDeque<(IxPair<'i>, Option>>)> { - let ref_set: HashSet<&[u8]> = self.key_set.iter().map(|k| k.as_ref()).collect(); - order_instructions(ref_set, tx) - } - - #[allow(clippy::borrowed_box)] - pub fn match_program(&self, key: &FBPubkey) -> Option<&Box> { - self.matchers - .get(&Pubkey::try_from(key.0.as_slice()).expect("valid key from FlatBuffer")) - } - - pub async fn handle_transaction<'a>( - &self, - tx: &'a TransactionInfo<'a>, - ) -> Result<(), IngesterError> { - let sig: Option<&str> = tx.signature(); - info!("Handling Transaction: {:?}", sig); - let instructions = self.break_transaction(tx); - let accounts = tx.account_keys().unwrap_or_default(); - let slot = tx.slot(); - let txn_id = tx.signature().unwrap_or(""); - let mut keys: Vec = Vec::with_capacity(accounts.len()); - for k in accounts.into_iter() { - keys.push(*k); - } - let mut not_impl = 0; - let ixlen = instructions.len(); - debug!("Instructions: {}", ixlen); - let contains = instructions - .iter() - .filter(|(ib, _inner)| ib.0 .0.as_ref() == mpl_bubblegum::ID.as_ref()); - debug!("Instructions bgum: {}", contains.count()); - for (outer_ix, inner_ix) in instructions { - let (program, instruction) = outer_ix; - let ix_accounts = instruction.accounts().unwrap().iter().collect::>(); - let ix_account_len = ix_accounts.len(); - let max = ix_accounts.iter().max().copied().unwrap_or(0) as usize; - if keys.len() < max { - return Err(IngesterError::DeserializationError( - "Missing Accounts in Serialized Ixn/Txn".to_string(), - )); - } - let ix_accounts = - ix_accounts - .iter() - .fold(Vec::with_capacity(ix_account_len), |mut acc, a| { - if let Some(key) = keys.get(*a as usize) { - acc.push(*key); - } - acc - }); - let ix = InstructionBundle { - txn_id, - program, - instruction: Some(instruction), - inner_ix, - keys: ix_accounts.as_slice(), - slot, - }; - - if let Some(program) = self.match_program(&ix.program) { - debug!("Found a ix for program: {:?}", program.key()); - let result = program.handle_instruction(&ix)?; - let concrete = result.result_type(); - match concrete { - ProgramParseResult::Bubblegum(parsing_result) => { - handle_bubblegum_instruction( - parsing_result, - &ix, - &self.storage, - &self.task_sender, - self.cl_audits, - ) - .await - .map_err(|err| { - error!( - "Failed to handle bubblegum instruction for txn {:?}: {:?}", - sig, err - ); - err - })?; - } - _ => { - not_impl += 1; - } - }; - } - } - - if not_impl == ixlen { - debug!("Not imple"); - return Err(IngesterError::NotImplemented); - } - Ok(()) - } - - pub async fn handle_account_update<'b>( - &self, - acct: AccountInfo<'b>, - ) -> Result<(), IngesterError> { - let owner = acct.owner().unwrap(); - if let Some(program) = self.match_program(owner) { - let result = program.handle_account(&acct)?; - let concrete = result.result_type(); - match concrete { - ProgramParseResult::TokenMetadata(parsing_result) => { - handle_token_metadata_account( - &acct, - parsing_result, - &self.storage, - &self.task_sender, - ) - .await - } - ProgramParseResult::TokenProgramAccount(parsing_result) => { - handle_token_program_account( - &acct, - parsing_result, - &self.storage, - &self.task_sender, - ) - .await - } - _ => Err(IngesterError::NotImplemented), - }?; - } - Ok(()) - } -} diff --git a/nft_ingester/src/program_transformers/token/mod.rs b/nft_ingester/src/program_transformers/token/mod.rs deleted file mode 100644 index c32cad020..000000000 --- a/nft_ingester/src/program_transformers/token/mod.rs +++ /dev/null @@ -1,141 +0,0 @@ -use crate::{error::IngesterError, tasks::TaskData}; -use blockbuster::programs::token_account::TokenProgramAccount; -use digital_asset_types::dao::{asset, token_accounts, tokens}; -use plerkle_serialization::AccountInfo; -use sea_orm::{ - entity::*, query::*, sea_query::OnConflict, ActiveValue::Set, ConnectionTrait, - DatabaseConnection, DbBackend, EntityTrait, -}; -use solana_sdk::program_option::COption; -use spl_token::state::AccountState; -use tokio::sync::mpsc::UnboundedSender; - -pub async fn handle_token_program_account<'a, 'b, 'c>( - account_update: &'a AccountInfo<'a>, - parsing_result: &'b TokenProgramAccount, - db: &'c DatabaseConnection, - _task_manager: &UnboundedSender, -) -> Result<(), IngesterError> { - let key = *account_update.pubkey().unwrap(); - let key_bytes = key.0.to_vec(); - let spl_token_program = account_update.owner().unwrap().0.to_vec(); - match &parsing_result { - TokenProgramAccount::TokenAccount(ta) => { - let mint = ta.mint.to_bytes().to_vec(); - let delegate: Option> = match ta.delegate { - COption::Some(d) => Some(d.to_bytes().to_vec()), - COption::None => None, - }; - let frozen = matches!(ta.state, AccountState::Frozen); - let owner = ta.owner.to_bytes().to_vec(); - let model = token_accounts::ActiveModel { - pubkey: Set(key_bytes), - mint: Set(mint.clone()), - delegate: Set(delegate.clone()), - owner: Set(owner.clone()), - frozen: Set(frozen), - delegated_amount: Set(ta.delegated_amount as i64), - token_program: Set(spl_token_program), - slot_updated: Set(account_update.slot() as i64), - amount: Set(ta.amount as i64), - close_authority: Set(None), - }; - - let mut query = token_accounts::Entity::insert(model) - .on_conflict( - OnConflict::columns([token_accounts::Column::Pubkey]) - .update_columns([ - token_accounts::Column::Mint, - token_accounts::Column::DelegatedAmount, - token_accounts::Column::Delegate, - token_accounts::Column::Amount, - token_accounts::Column::Frozen, - token_accounts::Column::TokenProgram, - token_accounts::Column::Owner, - token_accounts::Column::CloseAuthority, - token_accounts::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > token_accounts.slot_updated", - query.sql - ); - db.execute(query).await?; - let txn = db.begin().await?; - let asset_update: Option = asset::Entity::find_by_id(mint) - .filter(asset::Column::OwnerType.eq("single")) - .one(&txn) - .await?; - if let Some(asset) = asset_update { - // will only update owner if token account balance is non-zero - if ta.amount > 0 { - let mut active: asset::ActiveModel = asset.into(); - active.owner = Set(Some(owner)); - active.delegate = Set(delegate); - active.frozen = Set(frozen); - active.save(&txn).await?; - } - } - txn.commit().await?; - Ok(()) - } - TokenProgramAccount::Mint(m) => { - let freeze_auth: Option> = match m.freeze_authority { - COption::Some(d) => Some(d.to_bytes().to_vec()), - COption::None => None, - }; - let mint_auth: Option> = match m.mint_authority { - COption::Some(d) => Some(d.to_bytes().to_vec()), - COption::None => None, - }; - let model = tokens::ActiveModel { - mint: Set(key_bytes.clone()), - token_program: Set(spl_token_program), - slot_updated: Set(account_update.slot() as i64), - supply: Set(m.supply as i64), - decimals: Set(m.decimals as i32), - close_authority: Set(None), - extension_data: Set(None), - mint_authority: Set(mint_auth), - freeze_authority: Set(freeze_auth), - }; - - let mut query = tokens::Entity::insert(model) - .on_conflict( - OnConflict::columns([tokens::Column::Mint]) - .update_columns([ - tokens::Column::Supply, - tokens::Column::TokenProgram, - tokens::Column::MintAuthority, - tokens::Column::CloseAuthority, - tokens::Column::ExtensionData, - tokens::Column::SlotUpdated, - tokens::Column::Decimals, - tokens::Column::FreezeAuthority, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > tokens.slot_updated", - query.sql - ); - db.execute(query).await?; - let asset_update: Option = asset::Entity::find_by_id(key_bytes.clone()) - .filter(asset::Column::OwnerType.eq("single")) - .one(db) - .await?; - if let Some(asset) = asset_update { - let mut active: asset::ActiveModel = asset.into(); - active.supply = Set(m.supply as i64); - active.supply_mint = Set(Some(key_bytes)); - active.save(db).await?; - } - Ok(()) - } - _ => Err(IngesterError::NotImplemented), - }?; - Ok(()) -} diff --git a/nft_ingester/src/program_transformers/token_metadata/master_edition.rs b/nft_ingester/src/program_transformers/token_metadata/master_edition.rs deleted file mode 100644 index 4e30970b6..000000000 --- a/nft_ingester/src/program_transformers/token_metadata/master_edition.rs +++ /dev/null @@ -1,94 +0,0 @@ -use crate::error::IngesterError; -use blockbuster::token_metadata::state::{Key, MasterEditionV1, MasterEditionV2}; -use digital_asset_types::dao::{ - asset, asset_v1_account_attachments, - sea_orm_active_enums::{SpecificationAssetClass, V1AccountAttachments}, -}; -use plerkle_serialization::Pubkey as FBPubkey; -use sea_orm::{ - entity::*, query::*, sea_query::OnConflict, ActiveValue::Set, ConnectionTrait, - DatabaseTransaction, DbBackend, EntityTrait, -}; - -pub async fn save_v2_master_edition( - id: FBPubkey, - slot: u64, - me_data: &MasterEditionV2, - txn: &DatabaseTransaction, -) -> Result<(), IngesterError> { - save_master_edition( - V1AccountAttachments::MasterEditionV2, - id, - slot, - me_data, - txn, - ) - .await -} - -pub async fn save_v1_master_edition( - id: FBPubkey, - slot: u64, - me_data: &MasterEditionV1, - txn: &DatabaseTransaction, -) -> Result<(), IngesterError> { - let bridge = MasterEditionV2 { - supply: me_data.supply, - max_supply: me_data.max_supply, - key: Key::MasterEditionV1, // is this weird? - }; - save_master_edition( - V1AccountAttachments::MasterEditionV1, - id, - slot, - &bridge, - txn, - ) - .await -} -pub async fn save_master_edition( - _version: V1AccountAttachments, - id: FBPubkey, - slot: u64, - me_data: &MasterEditionV2, - txn: &DatabaseTransaction, -) -> Result<(), IngesterError> { - let id_bytes = id.0.to_vec(); - let master_edition: Option<(asset_v1_account_attachments::Model, Option)> = - asset_v1_account_attachments::Entity::find_by_id(id.0.to_vec()) - .find_also_related(asset::Entity) - .join(JoinType::InnerJoin, asset::Relation::AssetData.def()) - .one(txn) - .await?; - let ser = serde_json::to_value(me_data) - .map_err(|e| IngesterError::SerializatonError(e.to_string()))?; - - let model = asset_v1_account_attachments::ActiveModel { - id: Set(id_bytes), - attachment_type: Set(V1AccountAttachments::MasterEditionV1), - data: Set(Some(ser)), - slot_updated: Set(slot as i64), - ..Default::default() - }; - - if let Some((_me, Some(asset))) = master_edition { - let mut updatable: asset::ActiveModel = asset.into(); - updatable.supply = Set(1); - updatable.specification_asset_class = Set(Some(SpecificationAssetClass::Nft)); - updatable.update(txn).await?; - } - - let query = asset_v1_account_attachments::Entity::insert(model) - .on_conflict( - OnConflict::columns([asset_v1_account_attachments::Column::Id]) - .update_columns([ - asset_v1_account_attachments::Column::AttachmentType, - asset_v1_account_attachments::Column::Data, - asset_v1_account_attachments::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query).await?; - Ok(()) -} diff --git a/nft_ingester/src/program_transformers/token_metadata/mod.rs b/nft_ingester/src/program_transformers/token_metadata/mod.rs deleted file mode 100644 index 10ab9a74c..000000000 --- a/nft_ingester/src/program_transformers/token_metadata/mod.rs +++ /dev/null @@ -1,54 +0,0 @@ -mod master_edition; -mod v1_asset; - -use crate::{ - error::IngesterError, - program_transformers::token_metadata::{ - master_edition::{save_v1_master_edition, save_v2_master_edition}, - v1_asset::{burn_v1_asset, save_v1_asset}, - }, - tasks::TaskData, -}; -use blockbuster::programs::token_metadata::{TokenMetadataAccountData, TokenMetadataAccountState}; -use plerkle_serialization::AccountInfo; -use sea_orm::{DatabaseConnection, TransactionTrait}; -use tokio::sync::mpsc::UnboundedSender; - -pub async fn handle_token_metadata_account<'a, 'b, 'c>( - account_update: &'a AccountInfo<'a>, - parsing_result: &'b TokenMetadataAccountState, - db: &'c DatabaseConnection, - task_manager: &UnboundedSender, -) -> Result<(), IngesterError> { - let key = *account_update.pubkey().unwrap(); - match &parsing_result.data { - TokenMetadataAccountData::EmptyAccount => { - burn_v1_asset(db, key, account_update.slot()).await?; - Ok(()) - } - TokenMetadataAccountData::MasterEditionV1(m) => { - let txn = db.begin().await?; - save_v1_master_edition(key, account_update.slot(), m, &txn).await?; - txn.commit().await?; - Ok(()) - } - TokenMetadataAccountData::MetadataV1(m) => { - let task = save_v1_asset(db, m.mint.as_ref().into(), account_update.slot(), m).await?; - if let Some(task) = task { - task_manager.send(task)?; - } - Ok(()) - } - TokenMetadataAccountData::MasterEditionV2(m) => { - let txn = db.begin().await?; - save_v2_master_edition(key, account_update.slot(), m, &txn).await?; - txn.commit().await?; - Ok(()) - } - // TokenMetadataAccountData::EditionMarker(_) => {} - // TokenMetadataAccountData::UseAuthorityRecord(_) => {} - // TokenMetadataAccountData::CollectionAuthorityRecord(_) => {} - _ => Err(IngesterError::NotImplemented), - }?; - Ok(()) -} diff --git a/nft_ingester/src/tasks/common/mod.rs b/nft_ingester/src/tasks/common/mod.rs index bf39e455d..28d067a14 100644 --- a/nft_ingester/src/tasks/common/mod.rs +++ b/nft_ingester/src/tasks/common/mod.rs @@ -1,16 +1,46 @@ -use super::{BgTask, FromTaskData, IngesterError, IntoTaskData, TaskData}; -use async_trait::async_trait; -use chrono::NaiveDateTime; -use digital_asset_types::dao::asset_data; -use log::debug; -use reqwest::{Client, ClientBuilder}; -use sea_orm::*; -use serde::{Deserialize, Serialize}; -use std::{ - fmt::{Display, Formatter}, - time::Duration, +use { + super::{BgTask, FromTaskData, IngesterError, IntoTaskData, TaskData}, + async_trait::async_trait, + chrono::{NaiveDateTime, Utc}, + digital_asset_types::dao::asset_data, + futures::future::BoxFuture, + log::debug, + program_transformers::{DownloadMetadataInfo, DownloadMetadataNotifier}, + reqwest::{Client, ClientBuilder}, + sea_orm::*, + serde::{Deserialize, Serialize}, + std::{ + fmt::{Display, Formatter}, + time::Duration, + }, + tokio::sync::mpsc::UnboundedSender, + url::Url, }; -use url::Url; + +pub fn create_download_metadata_notifier( + bg_task_sender: UnboundedSender, +) -> DownloadMetadataNotifier { + Box::new( + move |info: DownloadMetadataInfo| -> BoxFuture< + 'static, + Result<(), Box>, + > { + let (asset_data_id, uri) = info.into_inner(); + let task = DownloadMetadata { + asset_data_id, + uri, + created_at: Some(Utc::now().naive_utc()), + }; + let task = task + .into_task_data() + .and_then(|task| { + bg_task_sender.send(task).map_err(Into::into) + }) + .map_err(Into::into); + Box::pin(async move { task }) + }, + ) +} const TASK_NAME: &str = "DownloadMetadata"; diff --git a/nft_ingester/src/transaction_notifications.rs b/nft_ingester/src/transaction_notifications.rs index 02aa5d971..e3932eaf6 100644 --- a/nft_ingester/src/transaction_notifications.rs +++ b/nft_ingester/src/transaction_notifications.rs @@ -1,19 +1,22 @@ -use std::sync::Arc; - -use crate::{ - metric, metrics::capture_result, program_transformers::ProgramTransformer, tasks::TaskData, -}; -use cadence_macros::{is_global_default_set, statsd_count, statsd_time}; -use chrono::Utc; -use log::{debug, error}; -use plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData}; -use plerkle_serialization::root_as_transaction_info; - -use sqlx::{Pool, Postgres}; -use tokio::{ - sync::mpsc::UnboundedSender, - task::{JoinHandle, JoinSet}, - time::Instant, +use { + crate::{ + metric, + metrics::capture_result, + tasks::{create_download_metadata_notifier, TaskData}, + }, + cadence_macros::{is_global_default_set, statsd_count, statsd_time}, + chrono::Utc, + log::{debug, error}, + plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData}, + plerkle_serialization::root_as_transaction_info, + program_transformers::ProgramTransformer, + sqlx::{Pool, Postgres}, + std::sync::Arc, + tokio::{ + sync::mpsc::UnboundedSender, + task::{JoinHandle, JoinSet}, + time::Instant, + }, }; pub fn transaction_worker( @@ -28,7 +31,11 @@ pub fn transaction_worker( tokio::spawn(async move { let source = T::new(config).await; if let Ok(mut msg) = source { - let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, cl_audits)); + let manager = Arc::new(ProgramTransformer::new( + pool, + create_download_metadata_notifier(bg_task_sender), + cl_audits, + )); loop { let e = msg.recv(stream_key, consumption_type.clone()).await; let mut tasks = JoinSet::new(); diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index 1b58e977b..03f582060 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -23,7 +23,7 @@ use { }; mod bubblegum; -mod error; +pub mod error; mod token; mod token_metadata; @@ -48,8 +48,10 @@ impl DownloadMetadataInfo { pub type DownloadMetadataNotifier = Box< dyn Fn( - DownloadMetadataInfo, - ) -> BoxFuture<'static, Result<(), Box>>, + DownloadMetadataInfo, + ) -> BoxFuture<'static, Result<(), Box>> + + Sync + + Send, >; pub struct ProgramTransformer {