Skip to content

Commit

Permalink
parallelize migration
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Mar 18, 2024
1 parent 4e995ba commit 120f35d
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 45 deletions.
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified crates/rpc/fixtures/mainnet.sqlite
Binary file not shown.
1 change: 1 addition & 0 deletions crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ cached = { workspace = true }
const_format = { workspace = true }
data-encoding = "2.4.0"
fake = { workspace = true }
flume = "0.11.0"
hex = { workspace = true }
lazy_static = { workspace = true }
pathfinder-common = { path = "../common" }
Expand Down
159 changes: 114 additions & 45 deletions crates/storage/src/schema/revision_0051.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,78 @@
use std::mem;
use std::{mem, sync::mpsc, thread};

use anyhow::Context;
use rusqlite::params;

pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
tracing::info!("migrating starknet_transactions to new format");

let mut transformers = Vec::new();
let (insert_tx, insert_rx) = mpsc::channel();
let (transform_tx, transform_rx) =
flume::unbounded::<(Vec<u8>, i64, Vec<u8>, Vec<u8>, Vec<u8>)>();
for _ in 0..thread::available_parallelism().unwrap().get() {
let insert_tx = insert_tx.clone();
let transform_rx = transform_rx.clone();
let mut compressor = zstd::bulk::Compressor::new(10).context("Create zstd compressor")?;
let transformer = thread::spawn(move || {
for (hash, idx, block_hash, transaction, receipt) in transform_rx.iter() {
// Load old DTOs.
let transaction = zstd::decode_all(transaction.as_slice())
.context("Decompressing transaction")
.unwrap();
let transaction: old_dto::Transaction = serde_json::from_slice(&transaction)
.context("Deserializing transaction")
.unwrap();
let transaction = pathfinder_common::transaction::Transaction::from(transaction);
let receipt = zstd::decode_all(receipt.as_slice())
.context("Decompressing receipt")
.unwrap();
let mut receipt: old_dto::Receipt = serde_json::from_slice(&receipt)
.context("Deserializing receipt")
.unwrap();
let events = mem::take(&mut receipt.events);
let receipt = pathfinder_common::receipt::Receipt::from(receipt);

// Serialize into new DTOs.
let transaction = crate::transaction::dto::Transaction::from(&transaction);
let transaction =
bincode::serde::encode_to_vec(transaction, bincode::config::standard())
.context("Serializing transaction")
.unwrap();
let transaction = compressor
.compress(&transaction)
.context("Compressing transaction")
.unwrap();
let receipt = crate::transaction::dto::Receipt::from(&receipt);
let receipt = bincode::serde::encode_to_vec(receipt, bincode::config::standard())
.context("Serializing receipt")
.unwrap();
let receipt = compressor
.compress(&receipt)
.context("Compressing receipt")
.unwrap();
let events = bincode::serde::encode_to_vec(
crate::transaction::dto::Events::V0 { events },
bincode::config::standard(),
)
.context("Serializing events")
.unwrap();
let events = compressor
.compress(&events)
.context("Compressing events")
.unwrap();

// Store the updated values.
if let Err(err) =
insert_tx.send((hash, idx, block_hash, transaction, receipt, events))
{
panic!("Failed to send transaction: {:?}", err);
}
}
});
transformers.push(transformer);
}

tx.execute(
r"
CREATE TABLE starknet_transactions_new (
Expand All @@ -19,55 +86,57 @@ pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
[],
)
.context("Creating starknet_transactions_new table")?;
let mut query =
let mut query_stmt =
tx.prepare("SELECT hash, idx, block_hash, tx, receipt FROM starknet_transactions")?;
let mut insert = tx.prepare(
let mut insert_stmt = tx.prepare(
r"INSERT INTO starknet_transactions_new (hash, idx, block_hash, tx, receipt, events)
VALUES (?, ?, ?, ?, ?, ?)",
)?;
let mut rows = query.query([])?;
let mut compressor = zstd::bulk::Compressor::new(10).context("Create zstd compressor")?;
while let Some(row) = rows.next()? {
let hash = row.get_ref_unwrap("hash").as_blob()?;
let idx = row.get_ref_unwrap("idx").as_i64()?;
let block_hash = row.get_ref_unwrap("block_hash").as_blob()?;

// Load old DTOs.
let transaction = row.get_ref_unwrap("tx").as_blob()?;
let transaction = zstd::decode_all(transaction).context("Decompressing transaction")?;
let transaction: old_dto::Transaction =
serde_json::from_slice(&transaction).context("Deserializing transaction")?;
let transaction = pathfinder_common::transaction::Transaction::from(transaction);
let receipt = row.get_ref_unwrap("receipt").as_blob()?;
let receipt = zstd::decode_all(receipt).context("Decompressing receipt")?;
let mut receipt: old_dto::Receipt =
serde_json::from_slice(&receipt).context("Deserializing receipt")?;
let events = mem::take(&mut receipt.events);
let receipt = pathfinder_common::receipt::Receipt::from(receipt);

// Serialize into new DTOs.
let transaction = crate::transaction::dto::Transaction::from(&transaction);
let transaction = bincode::serde::encode_to_vec(transaction, bincode::config::standard())
.context("Serializing transaction")?;
let transaction = compressor
.compress(&transaction)
.context("Compressing transaction")?;
let receipt = crate::transaction::dto::Receipt::from(&receipt);
let receipt = bincode::serde::encode_to_vec(receipt, bincode::config::standard())
.context("Serializing receipt")?;
let receipt = compressor
.compress(&receipt)
.context("Compressing receipt")?;
let events = bincode::serde::encode_to_vec(
crate::transaction::dto::Events::V0 { events },
bincode::config::standard(),
)
.context("Serializing events")?;
let events = compressor.compress(&events).context("Compressing events")?;

// Store the updated values.
insert.execute(params![hash, idx, block_hash, transaction, receipt, events])?;
const BATCH_SIZE: usize = 10_000;
let mut rows = query_stmt.query([])?;
loop {
let mut batch_size = 0;
for _ in 0..BATCH_SIZE {
match rows.next() {
Ok(Some(row)) => {
let hash = row.get_ref_unwrap("hash").as_blob()?;
let idx = row.get_ref_unwrap("idx").as_i64()?;
let block_hash = row.get_ref_unwrap("block_hash").as_blob()?;
let transaction = row.get_ref_unwrap("tx").as_blob()?;
let receipt = row.get_ref_unwrap("receipt").as_blob()?;
transform_tx
.send((
hash.to_vec(),
idx,
block_hash.to_vec(),
transaction.to_vec(),
receipt.to_vec(),
))
.context("Sending transaction to transformer")?;
batch_size += 1;
}
Ok(None) => break,
Err(err) => return Err(err.into()),
}
}
for _ in 0..batch_size {
let (hash, idx, block_hash, transaction, receipt, events) = insert_rx.recv()?;
insert_stmt.execute(params![hash, idx, block_hash, transaction, receipt, events])?;
}
if batch_size < BATCH_SIZE {
// This was the last batch.
break;
}
}

drop(insert_tx);
drop(transform_tx);

// Ensure that all transformers have finished successfully.
for transformer in transformers {
transformer.join().unwrap();
}

tx.execute("DROP TABLE starknet_transactions", [])?;
tx.execute(
"ALTER TABLE starknet_transactions_new RENAME TO starknet_transactions",
Expand Down

0 comments on commit 120f35d

Please sign in to comment.