Skip to content

Commit 28f3a2e

Browse files
joshieDoMikhail SozinSozinMshekhirin
authored
feat: add EtlConfig as well as setting the directory to datadir (#7124)
Co-authored-by: Mikhail Sozin <mikhail.sozin@chainstack.com> Co-authored-by: Misha <mikawamp@gmail.com> Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
1 parent 5d6ac4c commit 28f3a2e

File tree

18 files changed

+118
-43
lines changed

18 files changed

+118
-43
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/reth/src/commands/debug_cmd/execution.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl Command {
127127
header_downloader,
128128
body_downloader,
129129
factory.clone(),
130-
stage_conf.etl.etl_file_size,
130+
stage_conf.etl.clone(),
131131
)
132132
.set(SenderRecoveryStage {
133133
commit_threshold: stage_conf.sender_recovery.commit_threshold,

bin/reth/src/commands/import.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,6 @@ impl ImportCommand {
179179

180180
let max_block = file_client.max_block().unwrap_or(0);
181181

182-
let etl_file_size = config.stages.etl.etl_file_size;
183-
184182
let mut pipeline = Pipeline::builder()
185183
.with_tip_sender(tip_tx)
186184
// we want to sync all blocks the file client provides or 0 if empty
@@ -193,7 +191,7 @@ impl ImportCommand {
193191
header_downloader,
194192
body_downloader,
195193
factory.clone(),
196-
etl_file_size,
194+
config.stages.etl,
197195
)
198196
.set(SenderRecoveryStage {
199197
commit_threshold: config.stages.sender_recovery.commit_threshold,

bin/reth/src/commands/stage/run.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
};
1515
use clap::Parser;
1616
use reth_beacon_consensus::BeaconConsensus;
17-
use reth_config::Config;
17+
use reth_config::{config::EtlConfig, Config};
1818
use reth_db::init_db;
1919
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
2020
use reth_node_ethereum::EthEvmConfig;
@@ -86,6 +86,10 @@ pub struct Command {
8686
#[arg(long)]
8787
etl_file_size: Option<usize>,
8888

89+
/// Directory where to collect ETL files
90+
#[arg(long)]
91+
etl_dir: Option<PathBuf>,
92+
8993
/// Normally, running the stage requires unwinding for stages that already
9094
/// have been run, in order to not rewrite to the same database slots.
9195
///
@@ -155,7 +159,12 @@ impl Command {
155159

156160
let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1);
157161

158-
let etl_file_size = self.etl_file_size.unwrap_or(500 * 1024 * 1024);
162+
let etl_config = EtlConfig::new(
163+
Some(
164+
self.etl_dir.unwrap_or_else(|| EtlConfig::from_datadir(&data_dir.data_dir_path())),
165+
),
166+
self.etl_file_size.unwrap_or(EtlConfig::default_file_size()),
167+
);
159168

160169
let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
161170
match self.stage {
@@ -235,7 +244,7 @@ impl Command {
235244
)
236245
}
237246
StageEnum::TxLookup => {
238-
(Box::new(TransactionLookupStage::new(batch_size, etl_file_size, None)), None)
247+
(Box::new(TransactionLookupStage::new(batch_size, etl_config, None)), None)
239248
}
240249
StageEnum::AccountHashing => {
241250
(Box::new(AccountHashingStage::new(1, batch_size)), None)

book/cli/reth/recover/storage-tries.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,20 @@ Options:
4141
-h, --help
4242
Print help (see a summary with '-h')
4343
44+
Database:
45+
--db.log-level <LOG_LEVEL>
46+
Database logging level. Levels higher than "notice" require a debug build
47+
48+
Possible values:
49+
- fatal: Enables logging for critical conditions, i.e. assertion failures
50+
- error: Enables logging for error conditions
51+
- warn: Enables logging for warning conditions
52+
- notice: Enables logging for normal but significant condition
53+
- verbose: Enables logging for verbose informational
54+
- debug: Enables logging for debug-level messages
55+
- trace: Enables logging for trace debug-level messages
56+
- extra: Enables logging for extra debug-level messages
57+
4458
Logging:
4559
--log.stdout.format <FORMAT>
4660
The format to use for logs written to stdout

book/cli/reth/stage/run.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ Options:
6262
Batch size for stage execution and unwind
6363

6464
--etl-file-size <ETL_FILE_SIZE>
65-
Size for temporary file during ETL stages
65+
The maximum size in bytes of data held in memory before being flushed to disk as a file
66+
67+
--etl-dir <ETL_DIR>
68+
Directory where to collect ETL files
6669

6770
-s, --skip-unwind
6871
Normally, running the stage requires unwinding for stages that already have been run, in order to not rewrite to the same database slots.

crates/config/src/config.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use reth_network::{NetworkConfigBuilder, PeersConfig, SessionsConfig};
55
use reth_primitives::PruneModes;
66
use secp256k1::SecretKey;
77
use serde::{Deserialize, Deserializer, Serialize};
8-
use std::{path::PathBuf, time::Duration};
8+
use std::{
9+
path::{Path, PathBuf},
10+
time::Duration,
11+
};
912

1013
/// Configuration for the reth node.
1114
#[derive(Debug, Clone, Default, Deserialize, PartialEq, Serialize)]
@@ -238,16 +241,36 @@ impl Default for TransactionLookupConfig {
238241
}
239242

240243
/// Common ETL related configuration.
241-
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
244+
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
242245
#[serde(default)]
243246
pub struct EtlConfig {
247+
/// Data directory where temporary files are created.
248+
pub dir: Option<PathBuf>,
244249
/// The maximum size in bytes of data held in memory before being flushed to disk as a file.
245-
pub etl_file_size: usize,
250+
pub file_size: usize,
246251
}
247252

248253
impl Default for EtlConfig {
249254
fn default() -> Self {
250-
Self { etl_file_size: 500 * (1024 * 1024) }
255+
Self { dir: None, file_size: Self::default_file_size() }
256+
}
257+
}
258+
259+
impl EtlConfig {
260+
/// Creates an ETL configuration
261+
pub fn new(dir: Option<PathBuf>, file_size: usize) -> Self {
262+
Self { dir, file_size }
263+
}
264+
265+
/// Return default ETL directory from datadir path.
266+
pub fn from_datadir(path: &Path) -> PathBuf {
267+
path.join("etl-tmp")
268+
}
269+
270+
/// Default size in bytes of data held in memory before being flushed to disk as a file.
271+
pub const fn default_file_size() -> usize {
272+
// 500 MB
273+
500 * (1024 * 1024)
251274
}
252275
}
253276

crates/consensus/beacon/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ reth-tracing.workspace = true
5555
reth-revm.workspace = true
5656
reth-downloaders.workspace = true
5757
reth-node-ethereum.workspace = true
58+
reth-config.workspace = true
5859

5960
assert_matches.workspace = true
6061

crates/consensus/beacon/src/engine/test_utils.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{
66
use reth_blockchain_tree::{
77
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
88
};
9+
use reth_config::config::EtlConfig;
910
use reth_db::{test_utils::TempDatabase, DatabaseEnv as DE};
1011
type DatabaseEnv = TempDatabase<DE>;
1112
use reth_downloaders::{
@@ -406,7 +407,7 @@ where
406407
header_downloader,
407408
body_downloader,
408409
executor_factory.clone(),
409-
500 * (1024 * 1024),
410+
EtlConfig::default(),
410411
))
411412
}
412413
};

crates/etl/src/lib.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::{
1818
cmp::Reverse,
1919
collections::BinaryHeap,
2020
io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
21-
path::Path,
21+
path::{Path, PathBuf},
2222
};
2323

2424
use rayon::prelude::*;
@@ -42,6 +42,8 @@ where
4242
<K as Encode>::Encoded: std::fmt::Debug,
4343
<V as Compress>::Compressed: std::fmt::Debug,
4444
{
45+
/// Parent directory where to create ETL files
46+
parent_dir: Option<PathBuf>,
4547
/// Directory for temporary file storage
4648
dir: Option<TempDir>,
4749
/// Collection of temporary ETL files
@@ -66,8 +68,9 @@ where
6668
/// Create a new collector with some capacity.
6769
///
6870
/// Once the capacity (in bytes) is reached, the data is sorted and flushed to disk.
69-
pub fn new(buffer_capacity_bytes: usize) -> Self {
71+
pub fn new(buffer_capacity_bytes: usize, parent_dir: Option<PathBuf>) -> Self {
7072
Self {
73+
parent_dir,
7174
dir: None,
7275
buffer_size_bytes: 0,
7376
files: Vec::new(),
@@ -115,7 +118,15 @@ where
115118
/// doesn't exist, it will be created.
116119
fn dir(&mut self) -> io::Result<&TempDir> {
117120
if self.dir.is_none() {
118-
self.dir = Some(TempDir::new()?);
121+
self.dir = match &self.parent_dir {
122+
Some(dir) => {
123+
if !dir.exists() {
124+
std::fs::create_dir_all(dir)?;
125+
}
126+
Some(TempDir::new_in(dir)?)
127+
}
128+
None => Some(TempDir::new()?),
129+
};
119130
}
120131
Ok(self.dir.as_ref().unwrap())
121132
}
@@ -273,7 +284,7 @@ mod tests {
273284
let mut entries: Vec<_> =
274285
(0..10_000).map(|id| (TxHash::random(), id as TxNumber)).collect();
275286

276-
let mut collector = Collector::new(1024);
287+
let mut collector = Collector::new(1024, None);
277288
assert!(collector.dir.is_none());
278289

279290
for (k, v) in entries.clone() {

crates/node-builder/src/builder.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use reth_beacon_consensus::{
1919
BeaconConsensusEngine,
2020
};
2121
use reth_blockchain_tree::{BlockchainTreeConfig, ShareableBlockchainTree};
22+
use reth_config::config::EtlConfig;
2223
use reth_db::{
2324
database::Database,
2425
database_metrics::{DatabaseMetadata, DatabaseMetrics},
@@ -512,7 +513,7 @@ where
512513
executor,
513514
data_dir,
514515
mut config,
515-
reth_config,
516+
mut reth_config,
516517
..
517518
} = ctx;
518519

@@ -556,6 +557,11 @@ where
556557
hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone())));
557558
info!(target: "reth::cli", "StaticFileProducer initialized");
558559

560+
// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
561+
if reth_config.stages.etl.dir.is_none() {
562+
reth_config.stages.etl.dir = Some(EtlConfig::from_datadir(&data_dir.data_dir_path()));
563+
}
564+
559565
// Configure the pipeline
560566
let (mut pipeline, client) = if config.dev.dev {
561567
info!(target: "reth::cli", "Starting Reth in dev mode");

crates/node-core/src/node_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,7 @@ impl NodeConfig {
837837
header_downloader,
838838
body_downloader,
839839
factory.clone(),
840-
stage_config.etl.etl_file_size,
840+
stage_config.etl.clone(),
841841
)
842842
.set(SenderRecoveryStage {
843843
commit_threshold: stage_config.sender_recovery.commit_threshold,
@@ -871,7 +871,7 @@ impl NodeConfig {
871871
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
872872
.set(TransactionLookupStage::new(
873873
stage_config.transaction_lookup.chunk_size,
874-
stage_config.etl.etl_file_size,
874+
stage_config.etl.clone(),
875875
prune_modes.transaction_lookup,
876876
))
877877
.set(IndexAccountHistoryStage::new(

crates/stages/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ reth-trie = { workspace = true, features = ["metrics"] }
2222
reth-tokio-util.workspace = true
2323
reth-etl.workspace = true
2424
reth-static-file.workspace = true
25+
reth-config.workspace = true
2526

2627
# async
2728
tokio = { workspace = true, features = ["sync"] }

crates/stages/benches/criterion.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use criterion::{
44
BenchmarkGroup, Criterion,
55
};
66
use pprof::criterion::{Output, PProfProfiler};
7+
use reth_config::config::EtlConfig;
78
use reth_db::{test_utils::TempDatabase, DatabaseEnv};
89

910
use reth_primitives::{stage::StageCheckpoint, BlockNumber};
@@ -57,7 +58,7 @@ fn transaction_lookup(c: &mut Criterion) {
5758
let mut group = c.benchmark_group("Stages");
5859
// don't need to run each stage for that many times
5960
group.sample_size(10);
60-
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, 500 * 1024 * 1024, None);
61+
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, EtlConfig::default(), None);
6162

6263
let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);
6364

crates/stages/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
//! # use reth_provider::HeaderSyncMode;
2828
//! # use reth_provider::test_utils::create_test_provider_factory;
2929
//! # use reth_static_file::StaticFileProducer;
30+
//! # use reth_config::config::EtlConfig;
3031
//! #
3132
//! # let chain_spec = MAINNET.clone();
3233
//! # let consensus: Arc<dyn Consensus> = Arc::new(TestConsensus::default());
@@ -59,7 +60,7 @@
5960
//! headers_downloader,
6061
//! bodies_downloader,
6162
//! executor_factory,
62-
//! 500*1024*1024,
63+
//! EtlConfig::default(),
6364
//! )
6465
//! )
6566
//! .build(provider_factory, static_file_producer);

0 commit comments

Comments
 (0)