From 6114e61e134b7aa4bfd11b64b78c7131ac0bcc58 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru <96778637+danieltabacaru@users.noreply.github.com> Date: Mon, 11 Sep 2023 13:25:15 +0300 Subject: [PATCH] Upload data and migrate schema (#6944) * Main work of sync schema migrations * misc * Code review changes --- src/realm/error_codes.cpp | 1 + src/realm/error_codes.h | 3 + src/realm/error_codes.hpp | 1 + .../object-store/impl/realm_coordinator.cpp | 7 ++ src/realm/object-store/shared_realm.cpp | 17 ++- src/realm/object-store/shared_realm.hpp | 3 +- .../object-store/sync/async_open_task.cpp | 111 +++++++++++++++--- .../object-store/sync/async_open_task.hpp | 9 +- src/realm/object-store/sync/sync_session.cpp | 17 +++ src/realm/object-store/sync/sync_session.hpp | 10 ++ src/realm/sync/noinst/client_impl_base.cpp | 11 ++ src/realm/sync/noinst/protocol_codec.hpp | 1 + src/realm/sync/protocol.cpp | 9 ++ src/realm/sync/protocol.hpp | 5 + test/object-store/sync/session/session.cpp | 3 +- 15 files changed, 183 insertions(+), 25 deletions(-) diff --git a/src/realm/error_codes.cpp b/src/realm/error_codes.cpp index 6adbcd4319e..f7a10810b04 100644 --- a/src/realm/error_codes.cpp +++ b/src/realm/error_codes.cpp @@ -61,6 +61,7 @@ ErrorCategory ErrorCodes::error_categories(Error code) case SyncServerPermissionsChanged: case SyncUserMismatch: case SyncWriteNotAllowed: + case SyncSchemaMigrationError: return ErrorCategory().set(ErrorCategory::runtime_error).set(ErrorCategory::sync_error); case SyncConnectFailed: diff --git a/src/realm/error_codes.h b/src/realm/error_codes.h index 44549c8a655..7a2a65ca088 100644 --- a/src/realm/error_codes.h +++ b/src/realm/error_codes.h @@ -83,6 +83,7 @@ typedef enum realm_errno { RLM_ERR_TLS_HANDSHAKE_FAILED = 1042, RLM_ERR_WRONG_SYNC_TYPE = 1043, RLM_ERR_SYNC_WRITE_NOT_ALLOWED = 1044, + RLM_ERR_SYNC_SCHEMA_MIGRATION_ERROR = 1045, RLM_ERR_SYSTEM_ERROR = 1999, @@ -262,6 +263,8 @@ typedef enum realm_sync_errno_session { RLM_SYNC_ERR_SESSION_MIGRATE_TO_FLX = 232, RLM_SYNC_ERR_SESSION_BAD_PROGRESS = 233, RLM_SYNC_ERR_SESSION_REVERT_TO_PBS = 234, + RLM_SYNC_ERR_SESSION_BAD_SCHEMA_VERSION = 235, + RLM_SYNC_ERR_SESSION_SCHEMA_VERSION_CHANGED = 236, // Error code 299 is reserved as an "unknown session error" in tests } realm_sync_errno_session_e; diff --git a/src/realm/error_codes.hpp b/src/realm/error_codes.hpp index cc8d3fd6eb0..ddb08912717 100644 --- a/src/realm/error_codes.hpp +++ b/src/realm/error_codes.hpp @@ -126,6 +126,7 @@ class ErrorCodes { TlsHandshakeFailed = RLM_ERR_TLS_HANDSHAKE_FAILED, WrongSyncType = RLM_ERR_WRONG_SYNC_TYPE, SyncWriteNotAllowed = RLM_ERR_SYNC_WRITE_NOT_ALLOWED, + SyncSchemaMigrationError = RLM_ERR_SYNC_SCHEMA_MIGRATION_ERROR, SystemError = RLM_ERR_SYSTEM_ERROR, diff --git a/src/realm/object-store/impl/realm_coordinator.cpp b/src/realm/object-store/impl/realm_coordinator.cpp index 69c2f674077..8ffb09d910b 100644 --- a/src/realm/object-store/impl/realm_coordinator.cpp +++ b/src/realm/object-store/impl/realm_coordinator.cpp @@ -563,6 +563,13 @@ void RealmCoordinator::delete_and_reopen() util::CheckedLockGuard lock(m_realm_mutex); close(); util::File::remove(m_config.path); +#if REALM_ENABLE_SYNC + // Close the sync session. + if (m_sync_session) { + m_sync_session->force_close(); + m_sync_session = nullptr; + } +#endif open_db(); } diff --git a/src/realm/object-store/shared_realm.cpp b/src/realm/object-store/shared_realm.cpp index 596684a8271..057b37d41c7 100644 --- a/src/realm/object-store/shared_realm.cpp +++ b/src/realm/object-store/shared_realm.cpp @@ -290,8 +290,7 @@ bool Realm::schema_change_needs_write_transaction(Schema& schema, std::vector { * will be thrown instead. * * If the destination file does not exist, the action performed depends on - * the type of the source and destimation files. If the destination + * the type of the source and destination files. If the destination * configuration is a non-sync local Realm configuration, a compacted copy * of the current Transaction's data (which includes uncommitted changes if * applicable!) is written in streaming form, with no history. @@ -550,6 +550,7 @@ class Realm : public std::enable_shared_from_this { void set_schema(Schema const& reference, Schema schema); bool reset_file(Schema& schema, std::vector& changes_required); bool schema_change_needs_write_transaction(Schema& schema, std::vector& changes, uint64_t version); + void verify_schema_version_not_decreasing(uint64_t version); Schema get_full_schema(); // Ensure that m_schema and m_schema_version match that of the current diff --git a/src/realm/object-store/sync/async_open_task.cpp b/src/realm/object-store/sync/async_open_task.cpp index 395440c4611..5ff79181c1f 100644 --- a/src/realm/object-store/sync/async_open_task.cpp +++ b/src/realm/object-store/sync/async_open_task.cpp @@ -34,7 +34,7 @@ AsyncOpenTask::AsyncOpenTask(std::shared_ptr<_impl::RealmCoordinator> coordinato { } -void AsyncOpenTask::start(AsyncOpenCallback async_open_complete) +void AsyncOpenTask::start(AsyncOpenCallback callback) { util::CheckedUniqueLock lock(m_mutex); if (!m_session) @@ -43,8 +43,7 @@ void AsyncOpenTask::start(AsyncOpenCallback async_open_complete) lock.unlock(); std::shared_ptr self(shared_from_this()); - session->wait_for_download_completion([async_open_complete = std::move(async_open_complete), self, - this](Status status) mutable { + session->wait_for_download_completion([callback = std::move(callback), self, this](Status status) mutable { std::shared_ptr<_impl::RealmCoordinator> coordinator; { util::CheckedLockGuard lock(m_mutex); @@ -56,18 +55,19 @@ void AsyncOpenTask::start(AsyncOpenCallback async_open_complete) } if (!status.is_ok()) { - self->async_open_complete(std::move(async_open_complete), coordinator, status); + self->async_open_complete(std::move(callback), coordinator, status); return; } - auto config = coordinator->get_config(); - if (config.sync_config && config.sync_config->flx_sync_requested && - config.sync_config->subscription_initializer) { - const bool rerun_on_launch = config.sync_config->rerun_init_subscription_on_open; - self->attach_to_subscription_initializer(std::move(async_open_complete), coordinator, rerun_on_launch); - } - else { - self->async_open_complete(std::move(async_open_complete), coordinator, status); + self->migrate_schema_or_complete(std::move(callback), coordinator, status); + }); + // The callback does not extend the lifetime of the task if it's never invoked. + SyncSession::Internal::set_sync_schema_migration_callback(*session, [weak_self = weak_from_this(), this]() { + if (auto self = weak_self.lock()) { + util::CheckedLockGuard lock(m_mutex); + if (!m_session) + return; + m_sync_schema_migration_required = true; } }); session->revive_if_needed(); @@ -118,7 +118,7 @@ void AsyncOpenTask::unregister_download_progress_notifier(uint64_t token) m_session->unregister_progress_notifier(token); } -void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& async_open_callback, +void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& callback, std::shared_ptr<_impl::RealmCoordinator> coordinator, bool rerun_on_launch) { @@ -136,13 +136,13 @@ void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& async // We need to wait until subscription initializer completes std::shared_ptr self(shared_from_this()); init_subscription.get_state_change_notification(sync::SubscriptionSet::State::Complete) - .get_async([self, coordinator, async_open_callback = std::move(async_open_callback)]( + .get_async([self, coordinator, callback = std::move(callback)]( StatusWith state) mutable { - self->async_open_complete(std::move(async_open_callback), coordinator, state.get_status()); + self->async_open_complete(std::move(callback), coordinator, state.get_status()); }); } else { - async_open_complete(std::move(async_open_callback), coordinator, Status::OK()); + async_open_complete(std::move(callback), coordinator, Status::OK()); } } @@ -151,6 +151,10 @@ void AsyncOpenTask::async_open_complete(AsyncOpenCallback&& callback, { { util::CheckedLockGuard lock(m_mutex); + // 'Cancel' may have been called just before 'async_open_complete' is invoked. + if (!m_session) + return; + for (auto token : m_registered_callbacks) { m_session->unregister_progress_notifier(token); } @@ -170,4 +174,79 @@ void AsyncOpenTask::async_open_complete(AsyncOpenCallback&& callback, return callback({}, std::make_exception_ptr(Exception(status))); } +void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback, + std::shared_ptr<_impl::RealmCoordinator> coordinator, Status status) +{ + util::CheckedUniqueLock lock(m_mutex); + if (!m_session) + return; + auto session = m_session; + auto migrate_schema = m_sync_schema_migration_required; + lock.unlock(); + + if (!migrate_schema) { + wait_for_bootstrap_or_complete(std::move(callback), coordinator, status); + return; + } + + // Migrate the schema. + // * First upload the changes at the old schema version + // * Then delete the realm, reopen it, and rebootstrap at new schema version + // The lifetime of the task is extended until the bootstrap completes. + std::shared_ptr self(shared_from_this()); + session->wait_for_upload_completion([callback = std::move(callback), coordinator, session, self, + this](Status status) mutable { + { + util::CheckedLockGuard lock(m_mutex); + if (!m_session) + return; // Swallow all events if the task has been cancelled. + } + + if (!status.is_ok()) { + self->async_open_complete(std::move(callback), coordinator, status); + return; + } + + auto future = SyncSession::Internal::pause_async(*session); + // Wait until the SessionWrapper is done using the DBRef. + std::move(future).get_async([callback = std::move(callback), coordinator, self, this](Status status) mutable { + { + util::CheckedLockGuard lock(m_mutex); + if (!m_session) + return; // Swallow all events if the task has been cancelled. + } + + if (!status.is_ok()) { + self->async_open_complete(std::move(callback), coordinator, status); + return; + } + + // Delete the realm file and reopen it. + { + util::CheckedLockGuard lock(m_mutex); + m_session = nullptr; + coordinator->delete_and_reopen(); + m_session = coordinator->sync_session(); + } + + self->wait_for_bootstrap_or_complete(std::move(callback), coordinator, status); + }); + }); +} + +void AsyncOpenTask::wait_for_bootstrap_or_complete(AsyncOpenCallback&& callback, + std::shared_ptr<_impl::RealmCoordinator> coordinator, + Status status) +{ + auto config = coordinator->get_config(); + if (config.sync_config && config.sync_config->flx_sync_requested && + config.sync_config->subscription_initializer) { + const bool rerun_on_launch = config.sync_config->rerun_init_subscription_on_open; + attach_to_subscription_initializer(std::move(callback), coordinator, rerun_on_launch); + } + else { + async_open_complete(std::move(callback), coordinator, status); + } +} + } // namespace realm diff --git a/src/realm/object-store/sync/async_open_task.hpp b/src/realm/object-store/sync/async_open_task.hpp index 2dbf3b520bd..99df1e0e797 100644 --- a/src/realm/object-store/sync/async_open_task.hpp +++ b/src/realm/object-store/sync/async_open_task.hpp @@ -48,7 +48,7 @@ class AsyncOpenTask : public std::enable_shared_from_this { // // If multiple AsyncOpenTasks all attempt to download the same Realm and one of them is canceled, // the other tasks will receive a "Cancelled" exception. - void start(AsyncOpenCallback async_open_complete) REQUIRES(!m_mutex); + void start(AsyncOpenCallback callback) REQUIRES(!m_mutex); // Cancels the download and stops the session. No further functions should be called on this class. void cancel() REQUIRES(!m_mutex); @@ -62,12 +62,17 @@ class AsyncOpenTask : public std::enable_shared_from_this { REQUIRES(!m_mutex); void attach_to_subscription_initializer(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, bool) REQUIRES(!m_mutex); + void migrate_schema_or_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, Status) + REQUIRES(!m_mutex); + void wait_for_bootstrap_or_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, Status) + REQUIRES(!m_mutex); std::shared_ptr<_impl::RealmCoordinator> m_coordinator GUARDED_BY(m_mutex); std::shared_ptr m_session GUARDED_BY(m_mutex); std::vector m_registered_callbacks GUARDED_BY(m_mutex); mutable util::CheckedMutex m_mutex; - bool m_db_first_open{true}; + const bool m_db_first_open; + bool m_sync_schema_migration_required GUARDED_BY(m_mutex) = false; }; } // namespace realm diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 468bb4abb7d..11e6c8ebcb5 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -749,6 +749,16 @@ void SyncSession::handle_error(sync::SessionErrorInfo error) next_state = NextStateAfterError::inactive; log_out_user = true; break; + case sync::ProtocolErrorInfo::Action::MigrateSchema: + std::function callback; + { + util::CheckedLockGuard l(m_state_mutex); + callback = std::move(m_sync_schema_migration_callback); + } + if (callback) { + callback(); + } + return; // do not propgate the error to the user at this point } } else { @@ -901,6 +911,7 @@ void SyncSession::create_sync_session() session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes; session_config.session_reason = client_reset::is_fresh_path(m_config.path) ? sync::SessionReason::ClientReset : sync::SessionReason::Sync; + session_config.schema_version = m_config.schema_version; if (sync_config.on_sync_client_event_hook) { session_config.on_sync_client_event_hook = [hook = sync_config.on_sync_client_event_hook, @@ -989,6 +1000,12 @@ void SyncSession::create_sync_session() }); } +void SyncSession::set_sync_schema_migration_callback(std::function&& callback) +{ + util::CheckedLockGuard l(m_state_mutex); + m_sync_schema_migration_callback = std::move(callback); +} + void SyncSession::nonsync_transact_notify(sync::version_type version) { m_progress_notifier.set_local_version(version); diff --git a/src/realm/object-store/sync/sync_session.hpp b/src/realm/object-store/sync/sync_session.hpp index 07e0e4e298a..25d4d48aee4 100644 --- a/src/realm/object-store/sync/sync_session.hpp +++ b/src/realm/object-store/sync/sync_session.hpp @@ -274,6 +274,7 @@ class SyncSession : public std::enable_shared_from_this { class Internal { friend class _impl::RealmCoordinator; friend struct OnlyForTesting; + friend class AsyncOpenTask; static void nonsync_transact_notify(SyncSession& session, VersionID::version_type version) { @@ -285,6 +286,11 @@ class SyncSession : public std::enable_shared_from_this { return session.m_db; } + static void set_sync_schema_migration_callback(SyncSession& session, std::function&& callback) + { + session.set_sync_schema_migration_callback(std::move(callback)); + } + static util::Future pause_async(SyncSession& session); }; @@ -397,6 +403,8 @@ class SyncSession : public std::enable_shared_from_this { void nonsync_transact_notify(VersionID::version_type) REQUIRES(!m_state_mutex); + void set_sync_schema_migration_callback(std::function&&) REQUIRES(!m_state_mutex); + void create_sync_session() REQUIRES(m_state_mutex, !m_config_mutex); void did_drop_external_reference() REQUIRES(!m_state_mutex, !m_config_mutex, !m_external_reference_mutex, !m_connection_state_mutex); @@ -431,6 +439,8 @@ class SyncSession : public std::enable_shared_from_this { std::function m_sync_transact_callback GUARDED_BY(m_state_mutex); + std::function m_sync_schema_migration_callback GUARDED_BY(m_state_mutex); + template auto config(Field f) REQUIRES(!m_config_mutex) { diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index cb1bc1875d2..4c80849b963 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -2537,6 +2537,17 @@ Status Session::receive_error_message(const ProtocolErrorInfo& info) return Status::OK(); } + if (protocol_error == ProtocolError::schema_version_changed) { + // Enable upload immediately if the session is still active. + if (m_state == Active) { + m_allow_upload = true; + // Notify SyncSession a schema migration is required. + on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info}); + } + // Keep the session active to upload any unsynced changes. + return Status::OK(); + } + m_error_message_received = true; suspend(SessionErrorInfo{info, std::move(status)}); return Status::OK(); diff --git a/src/realm/sync/noinst/protocol_codec.hpp b/src/realm/sync/noinst/protocol_codec.hpp index eeaf71c569c..f60818ffd70 100644 --- a/src/realm/sync/noinst/protocol_codec.hpp +++ b/src/realm/sync/noinst/protocol_codec.hpp @@ -490,6 +490,7 @@ class ClientProtocol { {"RefreshUser", action::RefreshUser}, {"RefreshLocation", action::RefreshLocation}, {"LogOutUser", action::LogOutUser}, + {"MigrateSchema", action::MigrateSchema}, }; if (auto action_it = mapping.find(action_string); action_it != mapping.end()) { diff --git a/src/realm/sync/protocol.cpp b/src/realm/sync/protocol.cpp index b1c214fe784..525604f7853 100644 --- a/src/realm/sync/protocol.cpp +++ b/src/realm/sync/protocol.cpp @@ -123,6 +123,11 @@ const char* get_protocol_error_message(int error_code) noexcept case ProtocolError::revert_to_pbs: return "Server rolled back after flexible sync migration - reverting client to partition based " "sync"; + case ProtocolError::bad_schema_version: + return "Client tried to open a session with an invalid schema version (BIND)"; + case ProtocolError::schema_version_changed: + return "Client opened a session with a new valid schema version - migrating client to use new schema " + "version (BIND)"; } return nullptr; } @@ -214,6 +219,10 @@ Status protocol_error_to_status(ProtocolError error_code, std::string_view msg) [[fallthrough]]; case ProtocolError::revert_to_pbs: return ErrorCodes::WrongSyncType; + case ProtocolError::bad_schema_version: + [[fallthrough]]; + case ProtocolError::schema_version_changed: + return ErrorCodes::SyncSchemaMigrationError; case ProtocolError::limits_exceeded: [[fallthrough]]; diff --git a/src/realm/sync/protocol.hpp b/src/realm/sync/protocol.hpp index 3687ffc17e3..7f4009079c1 100644 --- a/src/realm/sync/protocol.hpp +++ b/src/realm/sync/protocol.hpp @@ -271,6 +271,7 @@ struct ProtocolErrorInfo { RefreshUser, RefreshLocation, LogOutUser, + MigrateSchema, }; ProtocolErrorInfo() = default; @@ -364,6 +365,8 @@ enum class ProtocolError { migrate_to_flx = RLM_SYNC_ERR_SESSION_MIGRATE_TO_FLX, // Server migrated from PBS to FLX - migrate client to FLX (BIND) bad_progress = RLM_SYNC_ERR_SESSION_BAD_PROGRESS, // Bad progress information (ERROR) revert_to_pbs = RLM_SYNC_ERR_SESSION_REVERT_TO_PBS, // Server rolled back to PBS after FLX migration - revert FLX client migration (BIND) + bad_schema_version = RLM_SYNC_ERR_SESSION_BAD_SCHEMA_VERSION, // Client tried to open a session with an invalid schema version (BIND) + schema_version_changed = RLM_SYNC_ERR_SESSION_SCHEMA_VERSION_CHANGED, // Client opened a session with a new valid schema version - migrate client to use new schema version (BIND) // clang-format on }; @@ -441,6 +444,8 @@ inline std::ostream& operator<<(std::ostream& o, ProtocolErrorInfo::Action actio return o << "RefreshLocation"; case ProtocolErrorInfo::Action::LogOutUser: return o << "LogOutUser"; + case ProtocolErrorInfo::Action::MigrateSchema: + return o << "MigrateSchema"; } return o << "Invalid error action: " << int64_t(action); } diff --git a/test/object-store/sync/session/session.cpp b/test/object-store/sync/session/session.cpp index b5972b1ef2f..fdc2eea0884 100644 --- a/test/object-store/sync/session/session.cpp +++ b/test/object-store/sync/session/session.cpp @@ -312,8 +312,7 @@ TEST_CASE("SyncSession: internal pause_async API", "[sync][session]") { TestSyncManager init_sync_manager; auto app = init_sync_manager.app(); auto user = app->sync_manager()->get_user("close-api-tests-user", ENCODE_FAKE_JWT("fake_refresh_token"), - ENCODE_FAKE_JWT("fake_access_token"), "https://realm.example.org", - dummy_device_id); + ENCODE_FAKE_JWT("fake_access_token"), dummy_device_id); auto session = sync_session( user, "/test-close-for-active", [](auto, auto) {}, SyncSessionStopPolicy::AfterChangesUploaded);