From 1bea1b9c4c2bd75fedaf9d3fb5f68861399ef85c Mon Sep 17 00:00:00 2001 From: reivilibre Date: Wed, 24 Apr 2024 16:47:48 +0000 Subject: [PATCH] Fix a bug introduced in v1.99.0-lts.2 where a worker being upgraded before the main process would cause Synapse to refuse to start. (#25) --- changelog.d/25.bugfix | 1 + synapse/storage/prepare_database.py | 42 +++++++++++++++---- tests/storage/test_capabilities.py | 64 ++++++++++++++++++++++++++++- 3 files changed, 97 insertions(+), 10 deletions(-) create mode 100644 changelog.d/25.bugfix diff --git a/changelog.d/25.bugfix b/changelog.d/25.bugfix new file mode 100644 index 00000000000..a0fc7afa570 --- /dev/null +++ b/changelog.d/25.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.99.0-lts.2 where a worker being upgraded before the main process would cause Synapse to refuse to start. \ No newline at end of file diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index f0dc5e4f854..124724897d4 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -66,6 +66,14 @@ class UpgradeDatabaseException(PrepareDatabaseException): "Expected database schema version %i but got %i: run the main synapse process to " "upgrade the database schema before starting worker processes." ) +OUTDATED_CAPABILITIES_ON_WORKER_ERROR = ( + "The database is missing some required capabilities %r: run the main synapse process to " + "upgrade the database schema before starting worker processes." +) +OUTDATED_COMPAT_VERSION_ON_WORKER_ERROR = ( + "The database needs the compat_version to be updated from %i to %i: run the main synapse process to " + "upgrade the database schema before starting worker processes." +) EMPTY_DATABASE_ON_WORKER_ERROR = ( "Uninitialised database: run the main synapse process to prepare the database " @@ -403,6 +411,18 @@ def _upgrade_existing_database( % (SCHEMA_VERSION, current_schema_state.current_version) ) + required_capabilities_to_add = REQUIRED_CAPABILITIES.difference( + current_schema_state.required_capabilities + ) + + if is_worker and required_capabilities_to_add: + # To start up, we'd need to add some required capabilities to the schema's state + # But as this is a worker, we can't do this and need the main process to run + # first. + raise UpgradeDatabaseException( + OUTDATED_CAPABILITIES_ON_WORKER_ERROR % required_capabilities_to_add + ) + # Check that this version of Synapse supports all the required capabilities. missing_capabilities = current_schema_state.required_capabilities - CAPABILITIES if missing_capabilities: @@ -425,9 +445,7 @@ def _upgrade_existing_database( # Record the new required capabilities for this version. if not is_worker: - for new_required_capability in REQUIRED_CAPABILITIES.difference( - current_schema_state.required_capabilities - ): + for new_required_capability in required_capabilities_to_add: cur.execute( "INSERT INTO schema_required_capabilities (capability) VALUES (?)", (new_required_capability,), @@ -447,11 +465,19 @@ def _upgrade_existing_database( current_schema_state.compat_version is None or current_schema_state.compat_version < SCHEMA_COMPAT_VERSION ): - cur.execute("DELETE FROM schema_compat_version") - cur.execute( - "INSERT INTO schema_compat_version(compat_version) VALUES (?)", - (SCHEMA_COMPAT_VERSION,), - ) + if is_worker: + # Don't update the compat_version from a worker, but also don't start up + # as the worker's behaviour might depend on rollbacks being blocked. + raise UpgradeDatabaseException( + OUTDATED_COMPAT_VERSION_ON_WORKER_ERROR + % (current_schema_state.compat_version, SCHEMA_COMPAT_VERSION) + ) + else: + cur.execute("DELETE FROM schema_compat_version") + cur.execute( + "INSERT INTO schema_compat_version(compat_version) VALUES (?)", + (SCHEMA_COMPAT_VERSION,), + ) start_ver = current_schema_state.current_version diff --git a/tests/storage/test_capabilities.py b/tests/storage/test_capabilities.py index 2124fe2b2d3..7ce85bc5022 100644 --- a/tests/storage/test_capabilities.py +++ b/tests/storage/test_capabilities.py @@ -13,9 +13,19 @@ # # +from unittest.mock import Mock + from synapse.storage.database import LoggingDatabaseConnection -from synapse.storage.prepare_database import _SchemaState, _upgrade_existing_database -from synapse.storage.schema import REQUIRED_CAPABILITIES, SCHEMA_VERSION +from synapse.storage.prepare_database import ( + UpgradeDatabaseException, + _SchemaState, + _upgrade_existing_database, +) +from synapse.storage.schema import ( + REQUIRED_CAPABILITIES, + SCHEMA_COMPAT_VERSION, + SCHEMA_VERSION, +) from tests.unittest import HomeserverTestCase @@ -106,3 +116,53 @@ def test_noop(self) -> None: _upgrade_existing_database( self.cur, state, self.db_pool.engine, self.hs.config, ("main", "state") ) + + def test_worker_starts_if_all_ok(self) -> None: + state = _SchemaState( + current_version=SCHEMA_VERSION, + compat_version=SCHEMA_COMPAT_VERSION, + applied_deltas=self.applied_deltas, + upgraded=True, + required_capabilities=REQUIRED_CAPABILITIES, + ) + + worker_config = Mock() + worker_config.worker.worker_app = "test" + + _upgrade_existing_database( + self.cur, state, self.db_pool.engine, worker_config, ("main", "state") + ) + + def test_worker_wont_add_required_capabilities(self) -> None: + state = _SchemaState( + current_version=SCHEMA_VERSION, + compat_version=SCHEMA_COMPAT_VERSION, + applied_deltas=self.applied_deltas, + upgraded=True, + required_capabilities=set(), + ) + + worker_config = Mock() + worker_config.worker.worker_app = "test" + + with self.assertRaises(UpgradeDatabaseException): + _upgrade_existing_database( + self.cur, state, self.db_pool.engine, worker_config, ("main", "state") + ) + + def test_worker_wont_add_upgrade_schema_compat_version(self) -> None: + state = _SchemaState( + current_version=SCHEMA_VERSION, + compat_version=SCHEMA_COMPAT_VERSION - 1, + applied_deltas=self.applied_deltas, + upgraded=True, + required_capabilities=REQUIRED_CAPABILITIES, + ) + + worker_config = Mock() + worker_config.worker.worker_app = "test" + + with self.assertRaises(UpgradeDatabaseException): + _upgrade_existing_database( + self.cur, state, self.db_pool.engine, worker_config, ("main", "state") + )