diff --git a/src/realm/object-store/sync/impl/sync_client.hpp b/src/realm/object-store/sync/impl/sync_client.hpp index c0c896a133f..73abfa1b10f 100644 --- a/src/realm/object-store/sync/impl/sync_client.hpp +++ b/src/realm/object-store/sync/impl/sync_client.hpp @@ -132,6 +132,12 @@ struct SyncClient { m_client.wait_for_session_terminations_or_client_stopped(); } + // Async version of wait_for_session_terminations(). + util::Future notify_session_terminated() + { + return m_client.notify_session_terminated(); + } + ~SyncClient() {} private: diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 98336d6a5e0..468bb4abb7d 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -631,11 +631,34 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status, revive_if_needed(); } +util::Future SyncSession::Internal::pause_async(SyncSession& session) +{ + { + util::CheckedUniqueLock lock(session.m_state_mutex); + // Nothing to wait for if the session is already paused or inactive. + if (session.m_state == SyncSession::State::Paused || session.m_state == SyncSession::State::Inactive) { + return util::Future::make_ready(); + } + } + // Transition immediately to `paused` state. Calling this function must guarantee that any + // sync::Session object in SyncSession::m_session that existed prior to the time of invocation + // must have been destroyed upon return. This allows the caller to follow up with a call to + // sync::Client::notify_session_terminated() in order to be notified when the Realm file is closed. This works + // so long as this SyncSession object remains in the `paused` state after the invocation of shutdown(). + session.pause(); + return session.m_client.notify_session_terminated(); +} + void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error) { session.handle_error(std::move(error)); } +util::Future SyncSession::OnlyForTesting::pause_async(SyncSession& session) +{ + return SyncSession::Internal::pause_async(session); +} + // This method should only be called from within the error handler callback registered upon the underlying // `m_session`. void SyncSession::handle_error(sync::SessionErrorInfo error) @@ -1108,7 +1131,7 @@ void SyncSession::close(util::CheckedUniqueLock lock) break; case State::Paused: case State::Inactive: { - // We need to register from the sync manager if it still exists so that we don't end up + // We need to unregister from the sync manager if it still exists so that we don't end up // holding the DBRef open after the session is closed. Otherwise we can end up preventing // the user from deleting the realm when it's in the paused/inactive state. if (m_sync_manager) { @@ -1127,7 +1150,7 @@ void SyncSession::close(util::CheckedUniqueLock lock) void SyncSession::shutdown_and_wait() { { - // Transition immediately to `inactive` state. Calling this function must gurantee that any + // Transition immediately to `inactive` state. Calling this function must guarantee that any // sync::Session object in SyncSession::m_session that existed prior to the time of invocation // must have been destroyed upon return. This allows the caller to follow up with a call to // sync::Client::wait_for_session_terminations_or_client_stopped() in order to wait for the diff --git a/src/realm/object-store/sync/sync_session.hpp b/src/realm/object-store/sync/sync_session.hpp index d6035cc3af4..07e0e4e298a 100644 --- a/src/realm/object-store/sync/sync_session.hpp +++ b/src/realm/object-store/sync/sync_session.hpp @@ -26,6 +26,7 @@ #include #include +#include #include #include @@ -118,7 +119,6 @@ class SyncSession : public std::enable_shared_from_this { Connected, }; - using StateChangeCallback = void(State old_state, State new_state); using ConnectionStateChangeCallback = void(ConnectionState old_state, ConnectionState new_state); using TransactionCallback = void(VersionID old_version, VersionID new_version); using ProgressNotifierCallback = _impl::SyncProgressNotifier::ProgressNotifierCallback; @@ -267,10 +267,13 @@ class SyncSession : public std::enable_shared_from_this { // Return an existing external reference to this session, if one exists. Otherwise, returns `nullptr`. std::shared_ptr existing_external_reference() REQUIRES(!m_external_reference_mutex); + struct OnlyForTesting; + // Expose some internal functionality to other parts of the ObjectStore // without making it public to everyone class Internal { friend class _impl::RealmCoordinator; + friend struct OnlyForTesting; static void nonsync_transact_notify(SyncSession& session, VersionID::version_type version) { @@ -281,6 +284,8 @@ class SyncSession : public std::enable_shared_from_this { { return session.m_db; } + + static util::Future pause_async(SyncSession& session); }; // Expose some internal functionality to testing code. @@ -314,6 +319,8 @@ class SyncSession : public std::enable_shared_from_this { { return session.get_subscription_store_base(); } + + static util::Future pause_async(SyncSession& session); }; private: diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 3b94234186e..319a68d55b0 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -453,6 +453,23 @@ bool ClientImpl::wait_for_session_terminations_or_client_stopped() } +// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped(). +util::Future ClientImpl::notify_session_terminated() +{ + auto pf = util::make_promise_future(); + post([promise = std::move(pf.promise)](Status status) mutable { + // Includes operation_aborted + if (!status.is_ok()) { + promise.set_error(status); + return; + } + + promise.emplace_value(); + }); + + return std::move(pf.future); +} + void ClientImpl::drain_connections_on_loop() { post([this](Status status) mutable { @@ -1103,8 +1120,10 @@ util::Future SessionImpl::send_test_command(std::string body) get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable { // Includes operation_aborted - if (!status.is_ok()) + if (!status.is_ok()) { promise.set_error(status); + return; + } auto id = ++m_last_pending_test_command_ident; m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)}); @@ -2021,6 +2040,10 @@ bool Client::wait_for_session_terminations_or_client_stopped() return m_impl->wait_for_session_terminations_or_client_stopped(); } +util::Future Client::notify_session_terminated() +{ + return m_impl->notify_session_terminated(); +} bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address, port_type& port, std::string& path) const diff --git a/src/realm/sync/client.hpp b/src/realm/sync/client.hpp index 12aa8954af9..d70e799506a 100644 --- a/src/realm/sync/client.hpp +++ b/src/realm/sync/client.hpp @@ -38,13 +38,6 @@ class Client { Client(Client&&) noexcept; ~Client() noexcept; - /// Run the internal event-loop of the client. At most one thread may - /// execute run() at any given time. The call will not return until somebody - /// calls stop(). - void run() noexcept; - - /// See run(). - /// /// Thread-safe. void shutdown() noexcept; @@ -100,6 +93,9 @@ class Client { /// by any thread, and by multiple threads concurrently. bool wait_for_session_terminations_or_client_stopped(); + /// Async version of wait_for_session_terminations_or_client_stopped(). + util::Future notify_session_terminated(); + /// Returns false if the specified URL is invalid. bool decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address, port_type& port, std::string& path) const; diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index f5cadcddc62..fd116802afd 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -230,6 +230,8 @@ class ClientImpl { void cancel_reconnect_delay(); bool wait_for_session_terminations_or_client_stopped(); + // Async version of wait_for_session_terminations_or_client_stopped(). + util::Future notify_session_terminated(); void voluntary_disconnect_all_connections(); private: diff --git a/test/object-store/sync/session/session.cpp b/test/object-store/sync/session/session.cpp index 7839507ff9e..b5972b1ef2f 100644 --- a/test/object-store/sync/session/session.cpp +++ b/test/object-store/sync/session/session.cpp @@ -308,6 +308,30 @@ TEST_CASE("SyncSession: shutdown_and_wait() API", "[sync][session]") { } } +TEST_CASE("SyncSession: internal pause_async API", "[sync][session]") { + TestSyncManager init_sync_manager; + auto app = init_sync_manager.app(); + auto user = app->sync_manager()->get_user("close-api-tests-user", ENCODE_FAKE_JWT("fake_refresh_token"), + ENCODE_FAKE_JWT("fake_access_token"), "https://realm.example.org", + dummy_device_id); + + auto session = sync_session( + user, "/test-close-for-active", [](auto, auto) {}, SyncSessionStopPolicy::AfterChangesUploaded); + EventLoop::main().run_until([&] { + return sessions_are_active(*session); + }); + REQUIRE(sessions_are_active(*session)); + auto dbref = SyncSession::OnlyForTesting::get_db(*session); + auto before = dbref.use_count(); + auto future = SyncSession::OnlyForTesting::pause_async(*session); + future.get(); + auto after = dbref.use_count(); + // Check SessionImpl released the sync agent as result of SessionWrapper::finalize() being called. + REQUIRE_NOTHROW(dbref->claim_sync_agent()); + // Check DBRef is released in SessionWrapper::finalize(). + REQUIRE(after < before); +} + TEST_CASE("SyncSession: update_configuration()", "[sync][session]") { TestSyncManager init_sync_manager({}, {false}); auto app = init_sync_manager.app();