Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async-graphql-axum = "7.0.15"
axum = "0.8.1"
chrono = "0.4.38"
bs58 = "0.5.1"
clap = { version = "4.5.4", features = ["derive", "env"] }
clap = { version = "4.5.4", features = ["derive", "env", "wrap_help"] }
derivative = "2.2.0"
diesel = { version = "2.2.7", features = [
"postgres",
Expand Down
2 changes: 2 additions & 0 deletions graph/src/components/store/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use diesel::result::Error as DieselError;
use thiserror::Error;
use tokio::task::JoinError;

pub type StoreResult<T> = Result<T, StoreError>;

#[derive(Error, Debug)]
pub enum StoreError {
#[error("store error: {0:#}")]
Expand Down
39 changes: 37 additions & 2 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ mod err;
mod traits;
pub mod write;

use diesel::deserialize::FromSql;
use diesel::pg::Pg;
use diesel::serialize::{Output, ToSql};
use diesel::sql_types::Integer;
use diesel_derives::{AsExpression, FromSqlRow};
pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache};
use slog::Logger;

pub use super::subgraph::Entity;
pub use err::StoreError;
pub use err::{StoreError, StoreResult};
use itertools::Itertools;
use strum_macros::Display;
pub use traits::*;
Expand Down Expand Up @@ -691,7 +696,20 @@ pub struct StoredDynamicDataSource {
/// identifier only has meaning in the context of a specific instance of
/// graph-node. Only store code should ever construct or consume it; all
/// other code passes it around as an opaque token.
#[derive(Copy, Clone, CheapClone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(
Copy,
Clone,
CheapClone,
Debug,
Serialize,
Deserialize,
PartialEq,
Eq,
Hash,
AsExpression,
FromSqlRow,
)]
#[diesel(sql_type = Integer)]
pub struct DeploymentId(pub i32);

impl Display for DeploymentId {
Expand All @@ -706,6 +724,19 @@ impl DeploymentId {
}
}

impl FromSql<Integer, Pg> for DeploymentId {
fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result<Self> {
let id = <i32 as FromSql<Integer, Pg>>::from_sql(bytes)?;
Ok(DeploymentId(id))
}
}

impl ToSql<Integer, Pg> for DeploymentId {
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result {
<i32 as ToSql<Integer, Pg>>::to_sql(&self.0, out)
}
}

/// A unique identifier for a deployment that specifies both its external
/// identifier (`hash`) and its unique internal identifier (`id`) which
/// ensures we are talking about a unique location for the deployment's data
Expand Down Expand Up @@ -972,6 +1003,9 @@ pub struct PruneRequest {
pub earliest_block: BlockNumber,
/// The last block that contains final entities not subject to a reorg
pub final_block: BlockNumber,
/// The first block for which the deployment contained entities when the
/// request was made
pub first_block: BlockNumber,
/// The latest block, i.e., the subgraph head
pub latest_block: BlockNumber,
/// Use the rebuild strategy when removing more than this fraction of
Expand Down Expand Up @@ -1035,6 +1069,7 @@ impl PruneRequest {
earliest_block,
final_block,
latest_block,
first_block,
rebuild_threshold,
delete_threshold,
})
Expand Down
6 changes: 6 additions & 0 deletions graph/src/env/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ pub struct EnvVarsStore {
/// blocks) than its history limit. The default value is 1.2 and the
/// value must be at least 1.01
pub history_slack_factor: f64,
/// For how many prune runs per deployment to keep status information.
/// Set by `GRAPH_STORE_HISTORY_KEEP_STATUS`. The default is 5
pub prune_keep_history: usize,
/// How long to accumulate changes into a batch before a write has to
/// happen. Set by the environment variable
/// `GRAPH_STORE_WRITE_BATCH_DURATION` in seconds. The default is 300s.
Expand Down Expand Up @@ -184,6 +187,7 @@ impl TryFrom<InnerStore> for EnvVarsStore {
rebuild_threshold: x.rebuild_threshold.0,
delete_threshold: x.delete_threshold.0,
history_slack_factor: x.history_slack_factor.0,
prune_keep_history: x.prune_keep_status,
write_batch_duration: Duration::from_secs(x.write_batch_duration_in_secs),
write_batch_size: x.write_batch_size * 1_000,
create_gin_indexes: x.create_gin_indexes,
Expand Down Expand Up @@ -257,6 +261,8 @@ pub struct InnerStore {
delete_threshold: ZeroToOneF64,
#[envconfig(from = "GRAPH_STORE_HISTORY_SLACK_FACTOR", default = "1.2")]
history_slack_factor: HistorySlackF64,
#[envconfig(from = "GRAPH_STORE_HISTORY_KEEP_STATUS", default = "5")]
prune_keep_status: usize,
#[envconfig(from = "GRAPH_STORE_WRITE_BATCH_DURATION", default = "300")]
write_batch_duration_in_secs: u64,
#[envconfig(from = "GRAPH_STORE_WRITE_BATCH_SIZE", default = "10000")]
Expand Down
154 changes: 110 additions & 44 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,35 +297,13 @@ pub enum Command {
#[clap(subcommand)]
Index(IndexCommand),

/// Prune a deployment
/// Prune subgraphs by removing old entity versions
///
/// Keep only entity versions that are needed to respond to queries at
/// block heights that are within `history` blocks of the subgraph head;
/// all other entity versions are removed.
///
/// Unless `--once` is given, this setting is permanent and the subgraph
/// will periodically be pruned to remove history as the subgraph head
/// moves forward.
Prune {
/// The deployment to prune (see `help info`)
deployment: DeploymentSearch,
/// Prune by rebuilding tables when removing more than this fraction
/// of history. Defaults to GRAPH_STORE_HISTORY_REBUILD_THRESHOLD
#[clap(long, short)]
rebuild_threshold: Option<f64>,
/// Prune by deleting when removing more than this fraction of
/// history but less than rebuild_threshold. Defaults to
/// GRAPH_STORE_HISTORY_DELETE_THRESHOLD
#[clap(long, short)]
delete_threshold: Option<f64>,
/// How much history to keep in blocks. Defaults to
/// GRAPH_MIN_HISTORY_BLOCKS
#[clap(long, short = 'y')]
history: Option<usize>,
/// Prune only this once
#[clap(long, short)]
once: bool,
},
#[clap(subcommand)]
Prune(PruneCommand),

/// General database management
#[clap(subcommand)]
Expand Down Expand Up @@ -694,6 +672,67 @@ pub enum StatsCommand {
},
}

#[derive(Clone, Debug, Subcommand)]
pub enum PruneCommand {
/// Prune a deployment in the foreground
///
/// Unless `--once` is given, this setting is permanent and the subgraph
/// will periodically be pruned to remove history as the subgraph head
/// moves forward.
Run {
/// The deployment to prune (see `help info`)
deployment: DeploymentSearch,
/// Prune by rebuilding tables when removing more than this fraction
/// of history. Defaults to GRAPH_STORE_HISTORY_REBUILD_THRESHOLD
#[clap(long, short)]
rebuild_threshold: Option<f64>,
/// Prune by deleting when removing more than this fraction of
/// history but less than rebuild_threshold. Defaults to
/// GRAPH_STORE_HISTORY_DELETE_THRESHOLD
#[clap(long, short)]
delete_threshold: Option<f64>,
/// How much history to keep in blocks. Defaults to
/// GRAPH_MIN_HISTORY_BLOCKS
#[clap(long, short = 'y')]
history: Option<usize>,
/// Prune only this once
#[clap(long, short)]
once: bool,
},
/// Prune a deployment in the background
///
/// Set the amount of history the subgraph should retain. The actual
/// data removal happens in the background and can be monitored with
/// `prune status`. It can take several minutes of the first pruning to
/// start, during which time `prune status` will not return any
/// information
Set {
/// The deployment to prune (see `help info`)
deployment: DeploymentSearch,
/// Prune by rebuilding tables when removing more than this fraction
/// of history. Defaults to GRAPH_STORE_HISTORY_REBUILD_THRESHOLD
#[clap(long, short)]
rebuild_threshold: Option<f64>,
/// Prune by deleting when removing more than this fraction of
/// history but less than rebuild_threshold. Defaults to
/// GRAPH_STORE_HISTORY_DELETE_THRESHOLD
#[clap(long, short)]
delete_threshold: Option<f64>,
/// How much history to keep in blocks. Defaults to
/// GRAPH_MIN_HISTORY_BLOCKS
#[clap(long, short = 'y')]
history: Option<usize>,
},
/// Show the status of a pruning operation
Status {
/// The number of the pruning run
#[clap(long, short)]
run: Option<usize>,
/// The deployment to check (see `help info`)
deployment: DeploymentSearch,
},
}

#[derive(Clone, Debug, Subcommand)]
pub enum IndexCommand {
/// Creates a new database index.
Expand Down Expand Up @@ -1613,25 +1652,52 @@ async fn main() -> anyhow::Result<()> {
}
}
}
Prune {
deployment,
history,
rebuild_threshold,
delete_threshold,
once,
} => {
let (store, primary_pool) = ctx.store_and_primary();
let history = history.unwrap_or(ENV_VARS.min_history_blocks.try_into()?);
commands::prune::run(
store,
primary_pool,
deployment,
history,
rebuild_threshold,
delete_threshold,
once,
)
.await
Prune(cmd) => {
use PruneCommand::*;
match cmd {
Run {
deployment,
history,
rebuild_threshold,
delete_threshold,
once,
} => {
let (store, primary_pool) = ctx.store_and_primary();
let history = history.unwrap_or(ENV_VARS.min_history_blocks.try_into()?);
commands::prune::run(
store,
primary_pool,
deployment,
history,
rebuild_threshold,
delete_threshold,
once,
)
.await
}
Set {
deployment,
rebuild_threshold,
delete_threshold,
history,
} => {
let (store, primary_pool) = ctx.store_and_primary();
let history = history.unwrap_or(ENV_VARS.min_history_blocks.try_into()?);
commands::prune::set(
store,
primary_pool,
deployment,
history,
rebuild_threshold,
delete_threshold,
)
.await
}
Status { run, deployment } => {
let (store, primary_pool) = ctx.store_and_primary();
commands::prune::status(store, primary_pool, deployment, run).await
}
}
}
Drop {
deployment,
Expand Down
28 changes: 4 additions & 24 deletions node/src/manager/commands/copy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQueryDsl};
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use std::{collections::HashMap, sync::Arc};

use graph::{
components::store::{BlockStore as _, DeploymentId, DeploymentLocator},
Expand All @@ -19,8 +19,8 @@ use graph_store_postgres::{
};
use graph_store_postgres::{ConnectionPool, Shard, Store, SubgraphStore};

use crate::manager::deployment::DeploymentSearch;
use crate::manager::display::List;
use crate::manager::{deployment::DeploymentSearch, fmt};

type UtcDateTime = DateTime<Utc>;

Expand Down Expand Up @@ -260,26 +260,6 @@ pub fn status(pools: HashMap<Shard, ConnectionPool>, dst: &DeploymentSearch) ->
use catalog::active_copies as ac;
use catalog::deployment_schemas as ds;

fn duration(start: &UtcDateTime, end: &Option<UtcDateTime>) -> String {
let start = *start;
let end = *end;

let end = end.unwrap_or(UtcDateTime::from(SystemTime::now()));
let duration = end - start;

human_duration(duration)
}

fn human_duration(duration: Duration) -> String {
if duration.num_seconds() < 5 {
format!("{}ms", duration.num_milliseconds())
} else if duration.num_minutes() < 5 {
format!("{}s", duration.num_seconds())
} else {
format!("{}m", duration.num_minutes())
}
}

let primary = pools
.get(&*PRIMARY_SHARD)
.ok_or_else(|| anyhow!("can not find deployment with id {}", dst))?;
Expand Down Expand Up @@ -336,7 +316,7 @@ pub fn status(pools: HashMap<Shard, ConnectionPool>, dst: &DeploymentSearch) ->
state.dst.to_string(),
state.target_block_number.to_string(),
on_sync.to_str().to_string(),
duration(&state.started_at, &state.finished_at),
fmt::duration(&state.started_at, &state.finished_at),
progress,
];
match (cancelled_at, state.cancelled_at) {
Expand Down Expand Up @@ -378,7 +358,7 @@ pub fn status(pools: HashMap<Shard, ConnectionPool>, dst: &DeploymentSearch) ->
table.next_vid,
table.target_vid,
table.batch_size,
human_duration(Duration::milliseconds(table.duration_ms)),
fmt::human_duration(Duration::milliseconds(table.duration_ms)),
);
}

Expand Down
Loading
Loading