From 16521ee8e69ab78999edecd108f2d2c06e327b9c Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 9 Apr 2025 17:45:15 -0700 Subject: [PATCH 1/3] store: Do not use a fdw connection to check active_copies Copying checks the active_copies table through the primary_public.active_copies foreign table to determine whether the copy has been cancelled and it should stop. With a large number of copies running, that causes a large number of postgres_fdw connections into the primary, which can overwhelm the primary. Instead, we now pass the connection pool for the primary into the copy code so that it can do this check without involving postgres_fdw. --- store/postgres/src/copy.rs | 73 ++++++++------------------ store/postgres/src/deployment_store.rs | 15 ++++-- store/postgres/src/primary.rs | 51 +++++++++++++++++- store/postgres/src/subgraph_store.rs | 9 +++- 4 files changed, 90 insertions(+), 58 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index effe2950ee2..a20c9b2a29d 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -37,7 +37,7 @@ use graph::{ info, lazy_static, o, warn, BlockNumber, BlockPtr, CheapClone, Logger, StoreError, ENV_VARS, }, schema::EntityType, - slog::{debug, error}, + slog::error, tokio, }; use itertools::Itertools; @@ -45,7 +45,7 @@ use itertools::Itertools; use crate::{ advisory_lock, catalog, deployment, dynds::DataSourcesTable, - primary::{DeploymentId, Site}, + primary::{DeploymentId, Primary, Site}, relational::index::IndexList, vid_batcher::{VidBatcher, VidRange}, }; @@ -104,46 +104,6 @@ table! { } } -// This is the same as primary::active_copies, but mapped into each shard -table! { - primary_public.active_copies(dst) { - src -> Integer, - dst -> Integer, - cancelled_at -> Nullable, - } -} - -/// Return `true` if the site is the source of a copy operation. The copy -/// operation might be just queued or in progress already. This method will -/// block until a fdw connection becomes available. -pub fn is_source(logger: &Logger, pool: &ConnectionPool, site: &Site) -> Result { - use active_copies as ac; - - // We use a fdw connection to check if the site is being copied. If we - // used an ordinary connection and there are many calls to this method, - // postgres_fdw might open an unmanageable number of connections into - // the primary, which makes the primary run out of connections - let mut last_log = Instant::now(); - let mut conn = pool.get_fdw(&logger, || { - if last_log.elapsed() > LOG_INTERVAL { - last_log = Instant::now(); - debug!( - logger, - "Waiting for fdw connection to check if site {} is being copied", site.namespace - ); - } - false - })?; - - select(diesel::dsl::exists( - ac::table - .filter(ac::src.eq(site.id)) - .filter(ac::cancelled_at.is_null()), - )) - .get_result::(&mut conn) - .map_err(StoreError::from) -} - #[derive(Copy, Clone, PartialEq, Eq, Debug)] pub enum Status { Finished, @@ -161,6 +121,7 @@ struct CopyState { impl CopyState { fn new( conn: &mut PgConnection, + primary: Primary, src: Arc, dst: Arc, target_block: BlockPtr, @@ -199,9 +160,9 @@ impl CopyState { src.site.id )); } - Self::load(conn, src, dst, target_block) + Self::load(conn, primary, src, dst, target_block) } - None => Self::create(conn, src, dst, target_block), + None => Self::create(conn, primary.cheap_clone(), src, dst, target_block), }?; Ok(state) @@ -209,11 +170,12 @@ impl CopyState { fn load( conn: &mut PgConnection, + primary: Primary, src: Arc, dst: Arc, target_block: BlockPtr, ) -> Result { - let tables = TableState::load(conn, src.as_ref(), dst.as_ref())?; + let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref())?; let (finished, mut unfinished): (Vec<_>, Vec<_>) = tables.into_iter().partition(|table| table.finished()); unfinished.sort_by_key(|table| table.dst.object.to_string()); @@ -228,6 +190,7 @@ impl CopyState { fn create( conn: &mut PgConnection, + primary: Primary, src: Arc, dst: Arc, target_block: BlockPtr, @@ -253,6 +216,7 @@ impl CopyState { .map(|src_table| { TableState::init( conn, + primary.cheap_clone(), dst.site.clone(), &src, src_table.clone(), @@ -354,6 +318,7 @@ pub(crate) fn source( /// transformation. See `CopyEntityBatchQuery` for the details of what /// exactly that means struct TableState { + primary: Primary, src: Arc, dst: Arc
, dst_site: Arc, @@ -364,6 +329,7 @@ struct TableState { impl TableState { fn init( conn: &mut PgConnection, + primary: Primary, dst_site: Arc, src_layout: &Layout, src: Arc
, @@ -373,6 +339,7 @@ impl TableState { let vid_range = VidRange::for_copy(conn, &src, target_block)?; let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?; Ok(Self { + primary, src, dst, dst_site, @@ -387,6 +354,7 @@ impl TableState { fn load( conn: &mut PgConnection, + primary: Primary, src_layout: &Layout, dst_layout: &Layout, ) -> Result, StoreError> { @@ -450,6 +418,7 @@ impl TableState { .with_batch_size(size as usize); Ok(TableState { + primary: primary.cheap_clone(), src, dst, dst_site: dst_layout.site.clone(), @@ -516,13 +485,8 @@ impl TableState { } fn is_cancelled(&self, conn: &mut PgConnection) -> Result { - use active_copies as ac; - let dst = self.dst_site.as_ref(); - let canceled = ac::table - .filter(ac::dst.eq(dst.id)) - .select(ac::cancelled_at.is_not_null()) - .get_result::(conn)?; + let canceled = self.primary.is_copy_cancelled(dst)?; if canceled { use copy_state as cs; @@ -893,6 +857,7 @@ pub struct Connection { /// `self.transaction` conn: Option, pool: ConnectionPool, + primary: Primary, workers: usize, src: Arc, dst: Arc, @@ -910,6 +875,7 @@ impl Connection { /// is available. pub fn new( logger: &Logger, + primary: Primary, pool: ConnectionPool, src: Arc, dst: Arc, @@ -942,6 +908,7 @@ impl Connection { logger, conn, pool, + primary, workers: ENV_VARS.store.batch_workers, src, dst, @@ -1079,7 +1046,9 @@ impl Connection { let src = self.src.clone(); let dst = self.dst.clone(); let target_block = self.target_block.clone(); - let mut state = self.transaction(|conn| CopyState::new(conn, src, dst, target_block))?; + let primary = self.primary.cheap_clone(); + let mut state = + self.transaction(|conn| CopyState::new(conn, primary, src, dst, target_block))?; let progress = Arc::new(CopyProgress::new(self.logger.cheap_clone(), &state)); progress.start(); diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index c78b06be46d..e497430c2bf 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -51,11 +51,11 @@ use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; use crate::deployment::{self, OnSync}; use crate::detail::ErrorDetail; use crate::dynds::DataSourcesTable; -use crate::primary::DeploymentId; +use crate::primary::{DeploymentId, Primary}; use crate::relational::index::{CreateIndex, IndexList, Method}; use crate::relational::{Layout, LayoutCache, SqlName, Table}; use crate::relational_queries::FromEntityData; -use crate::{advisory_lock, catalog, copy, retry}; +use crate::{advisory_lock, catalog, retry}; use crate::{connection_pool::ConnectionPool, detail}; use crate::{dynds, primary::Site}; @@ -93,6 +93,8 @@ type PruneHandle = JoinHandle>; pub struct StoreInner { logger: Logger, + primary: Primary, + pool: ConnectionPool, read_only_pools: Vec, @@ -130,6 +132,7 @@ impl Deref for DeploymentStore { impl DeploymentStore { pub fn new( logger: &Logger, + primary: Primary, pool: ConnectionPool, read_only_pools: Vec, mut pool_weights: Vec, @@ -160,6 +163,7 @@ impl DeploymentStore { // Create the store let store = StoreInner { logger: logger.clone(), + primary, pool, read_only_pools, replica_order, @@ -1235,7 +1239,7 @@ impl DeploymentStore { req: PruneRequest, ) -> Result<(), StoreError> { { - if copy::is_source(&logger, &store.pool, &site)? { + if store.is_source(&site)? { debug!( logger, "Skipping pruning since this deployment is being copied" @@ -1520,6 +1524,7 @@ impl DeploymentStore { // with the corresponding tables in `self` let copy_conn = crate::copy::Connection::new( logger, + self.primary.cheap_clone(), self.pool.clone(), src.clone(), dst.clone(), @@ -1848,6 +1853,10 @@ impl DeploymentStore { }) .await } + + fn is_source(&self, site: &Site) -> Result { + self.primary.is_source(site) + } } /// Tries to fetch a [`Table`] either by its Entity name or its SQL name. diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index f329ae4bba2..39df898ba32 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -36,6 +36,7 @@ use graph::{ store::scalar::ToPrimitive, subgraph::{status, DeploymentFeatures}, }, + derive::CheapClone, prelude::{ anyhow, chrono::{DateTime, Utc}, @@ -53,9 +54,9 @@ use maybe_owned::MaybeOwnedMut; use std::{ borrow::Borrow, collections::HashMap, - convert::TryFrom, - convert::TryInto, + convert::{TryFrom, TryInto}, fmt, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -1826,6 +1827,52 @@ impl<'a> Connection<'a> { } } +/// A limited interface to query the primary database. +#[derive(Clone, CheapClone)] +pub struct Primary { + pool: Arc, +} + +impl Primary { + pub fn new(pool: Arc) -> Self { + // This really indicates a programming error + if pool.shard != *PRIMARY_SHARD { + panic!("Primary pool must be the primary shard"); + } + + Primary { pool } + } + + /// Return `true` if the site is the source of a copy operation. The copy + /// operation might be just queued or in progress already. This method will + /// block until a fdw connection becomes available. + pub fn is_source(&self, site: &Site) -> Result { + use active_copies as ac; + + let mut conn = self.pool.get()?; + + select(diesel::dsl::exists( + ac::table + .filter(ac::src.eq(site.id)) + .filter(ac::cancelled_at.is_null()), + )) + .get_result::(&mut conn) + .map_err(StoreError::from) + } + + pub fn is_copy_cancelled(&self, dst: &Site) -> Result { + use active_copies as ac; + + let mut conn = self.pool.get()?; + + ac::table + .filter(ac::dst.eq(dst.id)) + .select(ac::cancelled_at.is_not_null()) + .get_result::(&mut conn) + .map_err(StoreError::from) + } +} + /// Return `true` if we deem this installation to be empty, defined as /// having no deployments and no subgraph names in the database pub fn is_empty(conn: &mut PgConnection) -> Result { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 0beeadf345d..339c66cee3f 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -39,7 +39,7 @@ use graph::{ use crate::{ connection_pool::ConnectionPool, deployment::{OnSync, SubgraphHealth}, - primary::{self, DeploymentId, Mirror as PrimaryMirror, Site}, + primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site}, relational::{ index::{IndexList, Method}, Layout, @@ -360,6 +360,12 @@ impl SubgraphStoreInner { sender: Arc, registry: Arc, ) -> Self { + let primary = stores + .iter() + .find(|(name, _, _, _)| name == &*PRIMARY_SHARD) + .map(|(_, pool, _, _)| Primary::new(Arc::new(pool.clone()))) + .expect("primary shard must be present"); + let mirror = { let pools = HashMap::from_iter( stores @@ -376,6 +382,7 @@ impl SubgraphStoreInner { name, Arc::new(DeploymentStore::new( &logger, + primary.cheap_clone(), main_pool, read_only_pools, weights, From ee88833b53f548200eda2e16840511e7cbb05c29 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 9 Apr 2025 18:33:27 -0700 Subject: [PATCH 2/3] store: Log more when copy workers have an error --- store/postgres/src/copy.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index a20c9b2a29d..fd736276cbd 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -1091,6 +1091,7 @@ impl Connection { W::Err(e) => { // This is a panic in the background task. We need to // cancel all other tasks and return the error + error!(self.logger, "copy worker panicked: {}", e); self.cancel_workers(progress, workers).await; return Err(e); } @@ -1115,6 +1116,7 @@ impl Connection { return Ok(Status::Cancelled); } (Err(e), _) => { + error!(self.logger, "copy worker had an error: {}", e); self.cancel_workers(progress, workers).await; return Err(e); } From 63b2a5c2d3a67e21ffbaac2d74a843cd049d24aa Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 9 Apr 2025 23:06:46 -0700 Subject: [PATCH 3/3] store: Make sure we use the right connection to unlock the copy lock Otherwise, the lock will linger and can block an attempt to restart a copy that failed for transient reasons --- store/postgres/src/copy.rs | 107 ++++++++++++++++++++++++++++++------- 1 file changed, 88 insertions(+), 19 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index fd736276cbd..22ddee394f6 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -64,8 +64,6 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30); /// the lag again const REPLICATION_SLEEP: Duration = Duration::from_secs(10); -type PooledPgConnection = PooledConnection>; - lazy_static! { static ref STATEMENT_TIMEOUT: Option = ENV_VARS .store @@ -667,17 +665,77 @@ impl From> for WorkerResult { } } +/// We pass connections back and forth between the control loop and various +/// workers. We need to make sure that we end up with the connection that +/// was used to acquire the copy lock in the right place so we can release +/// the copy lock which is only possible with the connection that acquired +/// it. +/// +/// This struct helps us with that. It wraps a connection and tracks whether +/// the connection was used to acquire the copy lock +struct LockTrackingConnection { + inner: PooledConnection>, + has_lock: bool, +} + +impl LockTrackingConnection { + fn new(inner: PooledConnection>) -> Self { + Self { + inner, + has_lock: false, + } + } + + fn transaction(&mut self, f: F) -> Result + where + F: FnOnce(&mut PgConnection) -> Result, + { + let conn = &mut self.inner; + conn.transaction(|conn| f(conn)) + } + + /// Put `self` into `other` if `self` has the lock. + fn extract(self, other: &mut Option) { + if self.has_lock { + *other = Some(self); + } + } + + fn lock(&mut self, logger: &Logger, dst: &Site) -> Result<(), StoreError> { + if self.has_lock { + warn!(logger, "already acquired copy lock for {}", dst); + return Ok(()); + } + advisory_lock::lock_copying(&mut self.inner, dst)?; + self.has_lock = true; + Ok(()) + } + + fn unlock(&mut self, logger: &Logger, dst: &Site) -> Result<(), StoreError> { + if !self.has_lock { + error!( + logger, + "tried to release copy lock for {} even though we are not the owner", dst + ); + return Ok(()); + } + advisory_lock::unlock_copying(&mut self.inner, dst)?; + self.has_lock = false; + Ok(()) + } +} + /// A helper to run copying of one table. We need to thread `conn` and /// `table` from the control loop to the background worker and back again to /// the control loop. This worker facilitates that struct CopyTableWorker { - conn: PooledPgConnection, + conn: LockTrackingConnection, table: TableState, result: Result, } impl CopyTableWorker { - fn new(conn: PooledPgConnection, table: TableState) -> Self { + fn new(conn: LockTrackingConnection, table: TableState) -> Self { Self { conn, table, @@ -699,7 +757,7 @@ impl CopyTableWorker { fn run_inner(&mut self, logger: Logger, progress: &CopyProgress) -> Result { use Status::*; - let conn = &mut self.conn; + let conn = &mut self.conn.inner; progress.start_table(&self.table); while !self.table.finished() { // It is important that this check happens outside the write @@ -855,7 +913,7 @@ pub struct Connection { /// individual table. Except for that case, this will always be /// `Some(..)`. Most code shouldn't access `self.conn` directly, but use /// `self.transaction` - conn: Option, + conn: Option, pool: ConnectionPool, primary: Primary, workers: usize, @@ -901,9 +959,9 @@ impl Connection { } false })?; - let conn = Some(conn); let src_manifest_idx_and_name = Arc::new(src_manifest_idx_and_name); let dst_manifest_idx_and_name = Arc::new(dst_manifest_idx_and_name); + let conn = Some(LockTrackingConnection::new(conn)); Ok(Self { logger, conn, @@ -990,6 +1048,7 @@ impl Connection { let Some(table) = state.unfinished.pop() else { return None; }; + let conn = LockTrackingConnection::new(conn); let worker = CopyTableWorker::new(conn, table); Some(Box::pin( @@ -1031,7 +1090,7 @@ impl Connection { let result = workers.select().await; match result { Ok(worker) => { - self.conn = Some(worker.conn); + worker.conn.extract(&mut self.conn); } Err(e) => { /* Ignore; we had an error previously */ @@ -1098,7 +1157,7 @@ impl Connection { W::Ok(worker) => { // Put the connection back into self.conn so that we can use it // in the next iteration. - self.conn = Some(worker.conn); + worker.conn.extract(&mut self.conn); match (worker.result, progress.is_cancelled()) { (Ok(Status::Finished), false) => { @@ -1207,20 +1266,30 @@ impl Connection { ); let dst_site = self.dst.site.cheap_clone(); - self.transaction(|conn| advisory_lock::lock_copying(conn, &dst_site))?; + let Some(conn) = self.conn.as_mut() else { + return Err(constraint_violation!( + "copy connection went missing (copy_data)" + )); + }; + conn.lock(&self.logger, &dst_site)?; let res = self.copy_data_internal(index_list).await; - if self.conn.is_none() { - // A background worker panicked and left us without our - // dedicated connection, but we still need to release the copy - // lock; get a normal connection, not from the fdw pool for that - // as that will be much less contended. We won't be holding on - // to the connection for long as `res` will be an error and we - // will abort starting this subgraph - self.conn = Some(self.pool.get()?); + match self.conn.as_mut() { + None => { + // A background worker panicked and left us without our + // dedicated connection; we would need to get that + // connection to unlock the advisory lock. We can't do that, + // so we just log an error + warn!( + self.logger, + "can't unlock copy lock since the default worker panicked; lock will linger until session ends" + ); + } + Some(conn) => { + conn.unlock(&self.logger, &dst_site)?; + } } - self.transaction(|conn| advisory_lock::unlock_copying(conn, &dst_site))?; if matches!(res, Ok(Status::Cancelled)) { warn!(&self.logger, "Copying was cancelled and is incomplete");