diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index effe2950ee2..22ddee394f6 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -37,7 +37,7 @@ use graph::{ info, lazy_static, o, warn, BlockNumber, BlockPtr, CheapClone, Logger, StoreError, ENV_VARS, }, schema::EntityType, - slog::{debug, error}, + slog::error, tokio, }; use itertools::Itertools; @@ -45,7 +45,7 @@ use itertools::Itertools; use crate::{ advisory_lock, catalog, deployment, dynds::DataSourcesTable, - primary::{DeploymentId, Site}, + primary::{DeploymentId, Primary, Site}, relational::index::IndexList, vid_batcher::{VidBatcher, VidRange}, }; @@ -64,8 +64,6 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30); /// the lag again const REPLICATION_SLEEP: Duration = Duration::from_secs(10); -type PooledPgConnection = PooledConnection>; - lazy_static! { static ref STATEMENT_TIMEOUT: Option = ENV_VARS .store @@ -104,46 +102,6 @@ table! { } } -// This is the same as primary::active_copies, but mapped into each shard -table! { - primary_public.active_copies(dst) { - src -> Integer, - dst -> Integer, - cancelled_at -> Nullable, - } -} - -/// Return `true` if the site is the source of a copy operation. The copy -/// operation might be just queued or in progress already. This method will -/// block until a fdw connection becomes available. -pub fn is_source(logger: &Logger, pool: &ConnectionPool, site: &Site) -> Result { - use active_copies as ac; - - // We use a fdw connection to check if the site is being copied. If we - // used an ordinary connection and there are many calls to this method, - // postgres_fdw might open an unmanageable number of connections into - // the primary, which makes the primary run out of connections - let mut last_log = Instant::now(); - let mut conn = pool.get_fdw(&logger, || { - if last_log.elapsed() > LOG_INTERVAL { - last_log = Instant::now(); - debug!( - logger, - "Waiting for fdw connection to check if site {} is being copied", site.namespace - ); - } - false - })?; - - select(diesel::dsl::exists( - ac::table - .filter(ac::src.eq(site.id)) - .filter(ac::cancelled_at.is_null()), - )) - .get_result::(&mut conn) - .map_err(StoreError::from) -} - #[derive(Copy, Clone, PartialEq, Eq, Debug)] pub enum Status { Finished, @@ -161,6 +119,7 @@ struct CopyState { impl CopyState { fn new( conn: &mut PgConnection, + primary: Primary, src: Arc, dst: Arc, target_block: BlockPtr, @@ -199,9 +158,9 @@ impl CopyState { src.site.id )); } - Self::load(conn, src, dst, target_block) + Self::load(conn, primary, src, dst, target_block) } - None => Self::create(conn, src, dst, target_block), + None => Self::create(conn, primary.cheap_clone(), src, dst, target_block), }?; Ok(state) @@ -209,11 +168,12 @@ impl CopyState { fn load( conn: &mut PgConnection, + primary: Primary, src: Arc, dst: Arc, target_block: BlockPtr, ) -> Result { - let tables = TableState::load(conn, src.as_ref(), dst.as_ref())?; + let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref())?; let (finished, mut unfinished): (Vec<_>, Vec<_>) = tables.into_iter().partition(|table| table.finished()); unfinished.sort_by_key(|table| table.dst.object.to_string()); @@ -228,6 +188,7 @@ impl CopyState { fn create( conn: &mut PgConnection, + primary: Primary, src: Arc, dst: Arc, target_block: BlockPtr, @@ -253,6 +214,7 @@ impl CopyState { .map(|src_table| { TableState::init( conn, + primary.cheap_clone(), dst.site.clone(), &src, src_table.clone(), @@ -354,6 +316,7 @@ pub(crate) fn source( /// transformation. See `CopyEntityBatchQuery` for the details of what /// exactly that means struct TableState { + primary: Primary, src: Arc, dst: Arc
, dst_site: Arc, @@ -364,6 +327,7 @@ struct TableState { impl TableState { fn init( conn: &mut PgConnection, + primary: Primary, dst_site: Arc, src_layout: &Layout, src: Arc
, @@ -373,6 +337,7 @@ impl TableState { let vid_range = VidRange::for_copy(conn, &src, target_block)?; let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?; Ok(Self { + primary, src, dst, dst_site, @@ -387,6 +352,7 @@ impl TableState { fn load( conn: &mut PgConnection, + primary: Primary, src_layout: &Layout, dst_layout: &Layout, ) -> Result, StoreError> { @@ -450,6 +416,7 @@ impl TableState { .with_batch_size(size as usize); Ok(TableState { + primary: primary.cheap_clone(), src, dst, dst_site: dst_layout.site.clone(), @@ -516,13 +483,8 @@ impl TableState { } fn is_cancelled(&self, conn: &mut PgConnection) -> Result { - use active_copies as ac; - let dst = self.dst_site.as_ref(); - let canceled = ac::table - .filter(ac::dst.eq(dst.id)) - .select(ac::cancelled_at.is_not_null()) - .get_result::(conn)?; + let canceled = self.primary.is_copy_cancelled(dst)?; if canceled { use copy_state as cs; @@ -703,17 +665,77 @@ impl From> for WorkerResult { } } +/// We pass connections back and forth between the control loop and various +/// workers. We need to make sure that we end up with the connection that +/// was used to acquire the copy lock in the right place so we can release +/// the copy lock which is only possible with the connection that acquired +/// it. +/// +/// This struct helps us with that. It wraps a connection and tracks whether +/// the connection was used to acquire the copy lock +struct LockTrackingConnection { + inner: PooledConnection>, + has_lock: bool, +} + +impl LockTrackingConnection { + fn new(inner: PooledConnection>) -> Self { + Self { + inner, + has_lock: false, + } + } + + fn transaction(&mut self, f: F) -> Result + where + F: FnOnce(&mut PgConnection) -> Result, + { + let conn = &mut self.inner; + conn.transaction(|conn| f(conn)) + } + + /// Put `self` into `other` if `self` has the lock. + fn extract(self, other: &mut Option) { + if self.has_lock { + *other = Some(self); + } + } + + fn lock(&mut self, logger: &Logger, dst: &Site) -> Result<(), StoreError> { + if self.has_lock { + warn!(logger, "already acquired copy lock for {}", dst); + return Ok(()); + } + advisory_lock::lock_copying(&mut self.inner, dst)?; + self.has_lock = true; + Ok(()) + } + + fn unlock(&mut self, logger: &Logger, dst: &Site) -> Result<(), StoreError> { + if !self.has_lock { + error!( + logger, + "tried to release copy lock for {} even though we are not the owner", dst + ); + return Ok(()); + } + advisory_lock::unlock_copying(&mut self.inner, dst)?; + self.has_lock = false; + Ok(()) + } +} + /// A helper to run copying of one table. We need to thread `conn` and /// `table` from the control loop to the background worker and back again to /// the control loop. This worker facilitates that struct CopyTableWorker { - conn: PooledPgConnection, + conn: LockTrackingConnection, table: TableState, result: Result, } impl CopyTableWorker { - fn new(conn: PooledPgConnection, table: TableState) -> Self { + fn new(conn: LockTrackingConnection, table: TableState) -> Self { Self { conn, table, @@ -735,7 +757,7 @@ impl CopyTableWorker { fn run_inner(&mut self, logger: Logger, progress: &CopyProgress) -> Result { use Status::*; - let conn = &mut self.conn; + let conn = &mut self.conn.inner; progress.start_table(&self.table); while !self.table.finished() { // It is important that this check happens outside the write @@ -891,8 +913,9 @@ pub struct Connection { /// individual table. Except for that case, this will always be /// `Some(..)`. Most code shouldn't access `self.conn` directly, but use /// `self.transaction` - conn: Option, + conn: Option, pool: ConnectionPool, + primary: Primary, workers: usize, src: Arc, dst: Arc, @@ -910,6 +933,7 @@ impl Connection { /// is available. pub fn new( logger: &Logger, + primary: Primary, pool: ConnectionPool, src: Arc, dst: Arc, @@ -935,13 +959,14 @@ impl Connection { } false })?; - let conn = Some(conn); let src_manifest_idx_and_name = Arc::new(src_manifest_idx_and_name); let dst_manifest_idx_and_name = Arc::new(dst_manifest_idx_and_name); + let conn = Some(LockTrackingConnection::new(conn)); Ok(Self { logger, conn, pool, + primary, workers: ENV_VARS.store.batch_workers, src, dst, @@ -1023,6 +1048,7 @@ impl Connection { let Some(table) = state.unfinished.pop() else { return None; }; + let conn = LockTrackingConnection::new(conn); let worker = CopyTableWorker::new(conn, table); Some(Box::pin( @@ -1064,7 +1090,7 @@ impl Connection { let result = workers.select().await; match result { Ok(worker) => { - self.conn = Some(worker.conn); + worker.conn.extract(&mut self.conn); } Err(e) => { /* Ignore; we had an error previously */ @@ -1079,7 +1105,9 @@ impl Connection { let src = self.src.clone(); let dst = self.dst.clone(); let target_block = self.target_block.clone(); - let mut state = self.transaction(|conn| CopyState::new(conn, src, dst, target_block))?; + let primary = self.primary.cheap_clone(); + let mut state = + self.transaction(|conn| CopyState::new(conn, primary, src, dst, target_block))?; let progress = Arc::new(CopyProgress::new(self.logger.cheap_clone(), &state)); progress.start(); @@ -1122,13 +1150,14 @@ impl Connection { W::Err(e) => { // This is a panic in the background task. We need to // cancel all other tasks and return the error + error!(self.logger, "copy worker panicked: {}", e); self.cancel_workers(progress, workers).await; return Err(e); } W::Ok(worker) => { // Put the connection back into self.conn so that we can use it // in the next iteration. - self.conn = Some(worker.conn); + worker.conn.extract(&mut self.conn); match (worker.result, progress.is_cancelled()) { (Ok(Status::Finished), false) => { @@ -1146,6 +1175,7 @@ impl Connection { return Ok(Status::Cancelled); } (Err(e), _) => { + error!(self.logger, "copy worker had an error: {}", e); self.cancel_workers(progress, workers).await; return Err(e); } @@ -1236,20 +1266,30 @@ impl Connection { ); let dst_site = self.dst.site.cheap_clone(); - self.transaction(|conn| advisory_lock::lock_copying(conn, &dst_site))?; + let Some(conn) = self.conn.as_mut() else { + return Err(constraint_violation!( + "copy connection went missing (copy_data)" + )); + }; + conn.lock(&self.logger, &dst_site)?; let res = self.copy_data_internal(index_list).await; - if self.conn.is_none() { - // A background worker panicked and left us without our - // dedicated connection, but we still need to release the copy - // lock; get a normal connection, not from the fdw pool for that - // as that will be much less contended. We won't be holding on - // to the connection for long as `res` will be an error and we - // will abort starting this subgraph - self.conn = Some(self.pool.get()?); + match self.conn.as_mut() { + None => { + // A background worker panicked and left us without our + // dedicated connection; we would need to get that + // connection to unlock the advisory lock. We can't do that, + // so we just log an error + warn!( + self.logger, + "can't unlock copy lock since the default worker panicked; lock will linger until session ends" + ); + } + Some(conn) => { + conn.unlock(&self.logger, &dst_site)?; + } } - self.transaction(|conn| advisory_lock::unlock_copying(conn, &dst_site))?; if matches!(res, Ok(Status::Cancelled)) { warn!(&self.logger, "Copying was cancelled and is incomplete"); diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index c78b06be46d..e497430c2bf 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -51,11 +51,11 @@ use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; use crate::deployment::{self, OnSync}; use crate::detail::ErrorDetail; use crate::dynds::DataSourcesTable; -use crate::primary::DeploymentId; +use crate::primary::{DeploymentId, Primary}; use crate::relational::index::{CreateIndex, IndexList, Method}; use crate::relational::{Layout, LayoutCache, SqlName, Table}; use crate::relational_queries::FromEntityData; -use crate::{advisory_lock, catalog, copy, retry}; +use crate::{advisory_lock, catalog, retry}; use crate::{connection_pool::ConnectionPool, detail}; use crate::{dynds, primary::Site}; @@ -93,6 +93,8 @@ type PruneHandle = JoinHandle>; pub struct StoreInner { logger: Logger, + primary: Primary, + pool: ConnectionPool, read_only_pools: Vec, @@ -130,6 +132,7 @@ impl Deref for DeploymentStore { impl DeploymentStore { pub fn new( logger: &Logger, + primary: Primary, pool: ConnectionPool, read_only_pools: Vec, mut pool_weights: Vec, @@ -160,6 +163,7 @@ impl DeploymentStore { // Create the store let store = StoreInner { logger: logger.clone(), + primary, pool, read_only_pools, replica_order, @@ -1235,7 +1239,7 @@ impl DeploymentStore { req: PruneRequest, ) -> Result<(), StoreError> { { - if copy::is_source(&logger, &store.pool, &site)? { + if store.is_source(&site)? { debug!( logger, "Skipping pruning since this deployment is being copied" @@ -1520,6 +1524,7 @@ impl DeploymentStore { // with the corresponding tables in `self` let copy_conn = crate::copy::Connection::new( logger, + self.primary.cheap_clone(), self.pool.clone(), src.clone(), dst.clone(), @@ -1848,6 +1853,10 @@ impl DeploymentStore { }) .await } + + fn is_source(&self, site: &Site) -> Result { + self.primary.is_source(site) + } } /// Tries to fetch a [`Table`] either by its Entity name or its SQL name. diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index f329ae4bba2..39df898ba32 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -36,6 +36,7 @@ use graph::{ store::scalar::ToPrimitive, subgraph::{status, DeploymentFeatures}, }, + derive::CheapClone, prelude::{ anyhow, chrono::{DateTime, Utc}, @@ -53,9 +54,9 @@ use maybe_owned::MaybeOwnedMut; use std::{ borrow::Borrow, collections::HashMap, - convert::TryFrom, - convert::TryInto, + convert::{TryFrom, TryInto}, fmt, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -1826,6 +1827,52 @@ impl<'a> Connection<'a> { } } +/// A limited interface to query the primary database. +#[derive(Clone, CheapClone)] +pub struct Primary { + pool: Arc, +} + +impl Primary { + pub fn new(pool: Arc) -> Self { + // This really indicates a programming error + if pool.shard != *PRIMARY_SHARD { + panic!("Primary pool must be the primary shard"); + } + + Primary { pool } + } + + /// Return `true` if the site is the source of a copy operation. The copy + /// operation might be just queued or in progress already. This method will + /// block until a fdw connection becomes available. + pub fn is_source(&self, site: &Site) -> Result { + use active_copies as ac; + + let mut conn = self.pool.get()?; + + select(diesel::dsl::exists( + ac::table + .filter(ac::src.eq(site.id)) + .filter(ac::cancelled_at.is_null()), + )) + .get_result::(&mut conn) + .map_err(StoreError::from) + } + + pub fn is_copy_cancelled(&self, dst: &Site) -> Result { + use active_copies as ac; + + let mut conn = self.pool.get()?; + + ac::table + .filter(ac::dst.eq(dst.id)) + .select(ac::cancelled_at.is_not_null()) + .get_result::(&mut conn) + .map_err(StoreError::from) + } +} + /// Return `true` if we deem this installation to be empty, defined as /// having no deployments and no subgraph names in the database pub fn is_empty(conn: &mut PgConnection) -> Result { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 0beeadf345d..339c66cee3f 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -39,7 +39,7 @@ use graph::{ use crate::{ connection_pool::ConnectionPool, deployment::{OnSync, SubgraphHealth}, - primary::{self, DeploymentId, Mirror as PrimaryMirror, Site}, + primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site}, relational::{ index::{IndexList, Method}, Layout, @@ -360,6 +360,12 @@ impl SubgraphStoreInner { sender: Arc, registry: Arc, ) -> Self { + let primary = stores + .iter() + .find(|(name, _, _, _)| name == &*PRIMARY_SHARD) + .map(|(_, pool, _, _)| Primary::new(Arc::new(pool.clone()))) + .expect("primary shard must be present"); + let mirror = { let pools = HashMap::from_iter( stores @@ -376,6 +382,7 @@ impl SubgraphStoreInner { name, Arc::new(DeploymentStore::new( &logger, + primary.cheap_clone(), main_pool, read_only_pools, weights,