Skip to content

Commit 81c6159

Browse files
authored
Merge pull request #3008 from autonomys/cpu-plotting-cli-refactoring
CPU plotting CLI refactoring
2 parents 7b69fc7 + 5240634 commit 81c6159

File tree

3 files changed

+308
-219
lines changed

3 files changed

+308
-219
lines changed
Lines changed: 102 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::commands::shared::PlottingThreadPriority;
22
use anyhow::anyhow;
3+
use async_lock::Mutex as AsyncMutex;
34
use clap::Parser;
45
use futures::{select, FutureExt};
56
use prometheus_client::registry::Registry;
@@ -17,65 +18,73 @@ use subspace_farmer::plotter::cpu::CpuPlotter;
1718
use subspace_farmer::utils::{
1819
create_plotting_thread_pool_manager, parse_cpu_cores_sets, thread_pool_core_indices,
1920
};
21+
use subspace_farmer_components::PieceGetter;
2022
use subspace_proof_of_space::Table;
2123
use tokio::sync::Semaphore;
2224
use tracing::info;
2325

24-
/// Arguments for plotter
2526
#[derive(Debug, Parser)]
26-
pub(super) struct PlotterArgs {
27-
/// Piece getter concurrency.
28-
///
29-
/// Increase can result in NATS communication issues if too many messages arrive via NATS, but
30-
/// are not processed quickly enough for some reason and might require increasing cluster-level
31-
/// `--nats-pool-size` parameter.
32-
#[arg(long, default_value = "32")]
33-
piece_getter_concurrency: NonZeroUsize,
27+
struct CpuPlottingOptions {
3428
/// Defines how many sectors farmer will download concurrently, allows to limit memory usage of
35-
/// the plotting process, defaults to `--sector-encoding-concurrency` + 1 to download future
29+
/// the plotting process, defaults to `--cpu-sector-encoding-concurrency` + 1 to download future
3630
/// sector ahead of time.
3731
///
3832
/// Increase will result in higher memory usage.
3933
#[arg(long)]
40-
sector_downloading_concurrency: Option<NonZeroUsize>,
34+
cpu_sector_downloading_concurrency: Option<NonZeroUsize>,
4135
/// Defines how many sectors farmer will encode concurrently, defaults to 1 on UMA system and
4236
/// number of NUMA nodes on NUMA system or L3 cache groups on large CPUs. It is further
4337
/// restricted by
44-
/// `--sector-downloading-concurrency` and setting this option higher than
45-
/// `--sector-downloading-concurrency` will have no effect.
38+
/// `--cpu-sector-downloading-concurrency` and setting this option higher than
39+
/// `--cpu-sector-downloading-concurrency` will have no effect.
4640
///
4741
/// Increase will result in higher memory usage.
4842
#[arg(long)]
49-
sector_encoding_concurrency: Option<NonZeroUsize>,
43+
cpu_sector_encoding_concurrency: Option<NonZeroUsize>,
5044
/// Defines how many records farmer will encode in a single sector concurrently, defaults to one
5145
/// record per 2 cores, but not more than 8 in total. Higher concurrency means higher memory
5246
/// usage and typically more efficient CPU utilization.
5347
#[arg(long)]
54-
record_encoding_concurrency: Option<NonZeroUsize>,
48+
cpu_record_encoding_concurrency: Option<NonZeroUsize>,
5549
/// Size of one thread pool used for plotting, defaults to number of logical CPUs available
5650
/// on UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache
5751
/// groups on large CPUs.
5852
///
59-
/// Number of thread pools is defined by `--sector-encoding-concurrency` option, different
53+
/// Number of thread pools is defined by `--cpu-sector-encoding-concurrency` option, different
6054
/// thread pools might have different number of threads if NUMA nodes do not have the same size.
6155
///
6256
/// Threads will be pinned to corresponding CPU cores at creation.
6357
#[arg(long)]
64-
plotting_thread_pool_size: Option<NonZeroUsize>,
58+
cpu_plotting_thread_pool_size: Option<NonZeroUsize>,
6559
/// Specify exact CPU cores to be used for plotting bypassing any custom logic farmer might use
66-
/// otherwise. It replaces both `--sector-encoding-concurrency` and
67-
/// `--plotting-thread-pool-size` options if specified.
60+
/// otherwise. It replaces both `--cpu-sector-encoding-concurrency` and
61+
/// `--cpu-plotting-thread-pool-size` options if specified.
6862
///
6963
/// Cores are coma-separated, with whitespace separating different thread pools/encoding
7064
/// instances. For example "0,1 2,3" will result in two sectors being encoded at the same time,
7165
/// each with a pair of CPU cores.
72-
#[arg(long, conflicts_with_all = & ["sector_encoding_concurrency", "plotting_thread_pool_size"])]
73-
plotting_cpu_cores: Option<String>,
66+
#[arg(long, conflicts_with_all = & ["cpu_sector_encoding_concurrency", "cpu_plotting_thread_pool_size"])]
67+
cpu_plotting_cores: Option<String>,
7468
/// Plotting thread priority, by default de-prioritizes plotting threads in order to make sure
7569
/// farming is successful and computer can be used comfortably for other things. Can be set to
7670
/// "min", "max" or "default".
7771
#[arg(long, default_value_t = PlottingThreadPriority::Min)]
78-
plotting_thread_priority: PlottingThreadPriority,
72+
cpu_plotting_thread_priority: PlottingThreadPriority,
73+
}
74+
75+
/// Arguments for plotter
76+
#[derive(Debug, Parser)]
77+
pub(super) struct PlotterArgs {
78+
/// Piece getter concurrency.
79+
///
80+
/// Increase can result in NATS communication issues if too many messages arrive via NATS, but
81+
/// are not processed quickly enough for some reason and might require increasing cluster-level
82+
/// `--nats-pool-size` parameter.
83+
#[arg(long, default_value = "32")]
84+
piece_getter_concurrency: NonZeroUsize,
85+
/// Plotting options only used by CPU plotter
86+
#[clap(flatten)]
87+
cpu_plotting_options: CpuPlottingOptions,
7988
/// Additional cluster components
8089
#[clap(raw = true)]
8190
pub(super) additional_components: Vec<String>,
@@ -92,12 +101,7 @@ where
92101
{
93102
let PlotterArgs {
94103
piece_getter_concurrency,
95-
sector_downloading_concurrency,
96-
sector_encoding_concurrency,
97-
record_encoding_concurrency,
98-
plotting_thread_pool_size,
99-
plotting_cpu_cores,
100-
plotting_thread_priority,
104+
cpu_plotting_options,
101105
additional_components: _,
102106
} = plotter_args;
103107

@@ -109,13 +113,62 @@ where
109113
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;
110114
let piece_getter = ClusterPieceGetter::new(nats_client.clone(), piece_getter_concurrency);
111115

116+
let global_mutex = Arc::default();
117+
118+
let (legacy_cpu_plotter, modern_cpu_plotter) = init_cpu_plotters::<_, PosTableLegacy, PosTable>(
119+
cpu_plotting_options,
120+
piece_getter,
121+
global_mutex,
122+
kzg,
123+
erasure_coding,
124+
registry,
125+
)?;
126+
let legacy_cpu_plotter = Arc::new(legacy_cpu_plotter);
127+
let modern_cpu_plotter = Arc::new(modern_cpu_plotter);
128+
129+
Ok(Box::pin(async move {
130+
select! {
131+
result = plotter_service(&nats_client, &legacy_cpu_plotter, false).fuse() => {
132+
result.map_err(|error| anyhow!("Plotter service failed: {error}"))
133+
}
134+
result = plotter_service(&nats_client, &modern_cpu_plotter, true).fuse() => {
135+
result.map_err(|error| anyhow!("Plotter service failed: {error}"))
136+
}
137+
}
138+
}))
139+
}
140+
141+
fn init_cpu_plotters<PG, PosTableLegacy, PosTable>(
142+
cpu_plotting_options: CpuPlottingOptions,
143+
piece_getter: PG,
144+
global_mutex: Arc<AsyncMutex<()>>,
145+
kzg: Kzg,
146+
erasure_coding: ErasureCoding,
147+
registry: &mut Registry,
148+
) -> anyhow::Result<(CpuPlotter<PG, PosTableLegacy>, CpuPlotter<PG, PosTable>)>
149+
where
150+
PG: PieceGetter + Clone + Send + Sync + 'static,
151+
PosTableLegacy: Table,
152+
PosTable: Table,
153+
{
154+
let CpuPlottingOptions {
155+
cpu_sector_downloading_concurrency,
156+
cpu_sector_encoding_concurrency,
157+
cpu_record_encoding_concurrency,
158+
cpu_plotting_thread_pool_size,
159+
cpu_plotting_cores,
160+
cpu_plotting_thread_priority,
161+
} = cpu_plotting_options;
162+
112163
let plotting_thread_pool_core_indices;
113-
if let Some(plotting_cpu_cores) = plotting_cpu_cores {
114-
plotting_thread_pool_core_indices = parse_cpu_cores_sets(&plotting_cpu_cores)
115-
.map_err(|error| anyhow!("Failed to parse `--plotting-cpu-cores`: {error}"))?;
164+
if let Some(cpu_plotting_cores) = cpu_plotting_cores {
165+
plotting_thread_pool_core_indices = parse_cpu_cores_sets(&cpu_plotting_cores)
166+
.map_err(|error| anyhow!("Failed to parse `--cpu-plotting-cpu-cores`: {error}"))?;
116167
} else {
117-
plotting_thread_pool_core_indices =
118-
thread_pool_core_indices(plotting_thread_pool_size, sector_encoding_concurrency);
168+
plotting_thread_pool_core_indices = thread_pool_core_indices(
169+
cpu_plotting_thread_pool_size,
170+
cpu_sector_encoding_concurrency,
171+
);
119172

120173
if plotting_thread_pool_core_indices.len() > 1 {
121174
info!(
@@ -126,12 +179,12 @@ where
126179
}
127180

128181
let downloading_semaphore = Arc::new(Semaphore::new(
129-
sector_downloading_concurrency
130-
.map(|sector_downloading_concurrency| sector_downloading_concurrency.get())
182+
cpu_sector_downloading_concurrency
183+
.map(|cpu_sector_downloading_concurrency| cpu_sector_downloading_concurrency.get())
131184
.unwrap_or(plotting_thread_pool_core_indices.len() + 1),
132185
));
133186

134-
let record_encoding_concurrency = record_encoding_concurrency.unwrap_or_else(|| {
187+
let cpu_record_encoding_concurrency = cpu_record_encoding_concurrency.unwrap_or_else(|| {
135188
let cpu_cores = plotting_thread_pool_core_indices
136189
.first()
137190
.expect("Guaranteed to have some CPU cores; qed");
@@ -157,39 +210,30 @@ where
157210
plotting_thread_pool_core_indices
158211
.into_iter()
159212
.zip(replotting_thread_pool_core_indices),
160-
plotting_thread_priority.into(),
213+
cpu_plotting_thread_priority.into(),
161214
)
162215
.map_err(|error| anyhow!("Failed to create thread pool manager: {error}"))?;
163-
let global_mutex = Arc::default();
164-
let legacy_cpu_plotter = Arc::new(CpuPlotter::<_, PosTableLegacy>::new(
216+
217+
let legacy_cpu_plotter = CpuPlotter::<_, PosTableLegacy>::new(
165218
piece_getter.clone(),
166219
Arc::clone(&downloading_semaphore),
167220
plotting_thread_pool_manager.clone(),
168-
record_encoding_concurrency,
221+
cpu_record_encoding_concurrency,
169222
Arc::clone(&global_mutex),
170223
kzg.clone(),
171224
erasure_coding.clone(),
172225
Some(registry),
173-
));
174-
let modern_cpu_plotter = Arc::new(CpuPlotter::<_, PosTable>::new(
175-
piece_getter.clone(),
226+
);
227+
let modern_cpu_plotter = CpuPlotter::<_, PosTable>::new(
228+
piece_getter,
176229
downloading_semaphore,
177230
plotting_thread_pool_manager,
178-
record_encoding_concurrency,
179-
Arc::clone(&global_mutex),
180-
kzg.clone(),
181-
erasure_coding.clone(),
231+
cpu_record_encoding_concurrency,
232+
global_mutex,
233+
kzg,
234+
erasure_coding,
182235
Some(registry),
183-
));
236+
);
184237

185-
Ok(Box::pin(async move {
186-
select! {
187-
result = plotter_service(&nats_client, &legacy_cpu_plotter, false).fuse() => {
188-
result.map_err(|error| anyhow!("Plotter service failed: {error}"))
189-
}
190-
result = plotter_service(&nats_client, &modern_cpu_plotter, true).fuse() => {
191-
result.map_err(|error| anyhow!("Plotter service failed: {error}"))
192-
}
193-
}
194-
}))
238+
Ok((legacy_cpu_plotter, modern_cpu_plotter))
195239
}

0 commit comments

Comments
 (0)