Skip to content

Commit

Permalink
Add pause_async to SyncSession (#6845)
Browse files Browse the repository at this point in the history
* Add async shutdown of SyncSession

* SyncSession::shutdown returns a Future instead of passing callbacks around

* bug fix

* Fix the test

* A bit of redesign

* Improve test

* Relax check in test

* Rename shutdown() to pause_async(). Hide it from the public API.

* Remove unneeded header import
  • Loading branch information
danieltabacaru committed Nov 14, 2023
1 parent b763242 commit 690d78a
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 11 deletions.
6 changes: 6 additions & 0 deletions src/realm/object-store/sync/impl/sync_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ struct SyncClient {
m_client.wait_for_session_terminations_or_client_stopped();
}

// Async version of wait_for_session_terminations().
util::Future<void> notify_session_terminated()
{
return m_client.notify_session_terminated();
}

~SyncClient() {}

private:
Expand Down
27 changes: 25 additions & 2 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,34 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
revive_if_needed();
}

util::Future<void> SyncSession::Internal::pause_async(SyncSession& session)
{
{
util::CheckedUniqueLock lock(session.m_state_mutex);
// Nothing to wait for if the session is already paused or inactive.
if (session.m_state == SyncSession::State::Paused || session.m_state == SyncSession::State::Inactive) {
return util::Future<void>::make_ready();
}
}
// Transition immediately to `paused` state. Calling this function must guarantee that any
// sync::Session object in SyncSession::m_session that existed prior to the time of invocation
// must have been destroyed upon return. This allows the caller to follow up with a call to
// sync::Client::notify_session_terminated() in order to be notified when the Realm file is closed. This works
// so long as this SyncSession object remains in the `paused` state after the invocation of shutdown().
session.pause();
return session.m_client.notify_session_terminated();
}

void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error)
{
session.handle_error(std::move(error));
}

util::Future<void> SyncSession::OnlyForTesting::pause_async(SyncSession& session)
{
return SyncSession::Internal::pause_async(session);
}

// This method should only be called from within the error handler callback registered upon the underlying
// `m_session`.
void SyncSession::handle_error(sync::SessionErrorInfo error)
Expand Down Expand Up @@ -1108,7 +1131,7 @@ void SyncSession::close(util::CheckedUniqueLock lock)
break;
case State::Paused:
case State::Inactive: {
// We need to register from the sync manager if it still exists so that we don't end up
// We need to unregister from the sync manager if it still exists so that we don't end up
// holding the DBRef open after the session is closed. Otherwise we can end up preventing
// the user from deleting the realm when it's in the paused/inactive state.
if (m_sync_manager) {
Expand All @@ -1127,7 +1150,7 @@ void SyncSession::close(util::CheckedUniqueLock lock)
void SyncSession::shutdown_and_wait()
{
{
// Transition immediately to `inactive` state. Calling this function must gurantee that any
// Transition immediately to `inactive` state. Calling this function must guarantee that any
// sync::Session object in SyncSession::m_session that existed prior to the time of invocation
// must have been destroyed upon return. This allows the caller to follow up with a call to
// sync::Client::wait_for_session_terminations_or_client_stopped() in order to wait for the
Expand Down
9 changes: 8 additions & 1 deletion src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <realm/sync/subscriptions.hpp>

#include <realm/util/checked_mutex.hpp>
#include <realm/util/future.hpp>
#include <realm/util/optional.hpp>
#include <realm/version_id.hpp>

Expand Down Expand Up @@ -118,7 +119,6 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
Connected,
};

using StateChangeCallback = void(State old_state, State new_state);
using ConnectionStateChangeCallback = void(ConnectionState old_state, ConnectionState new_state);
using TransactionCallback = void(VersionID old_version, VersionID new_version);
using ProgressNotifierCallback = _impl::SyncProgressNotifier::ProgressNotifierCallback;
Expand Down Expand Up @@ -267,10 +267,13 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
// Return an existing external reference to this session, if one exists. Otherwise, returns `nullptr`.
std::shared_ptr<SyncSession> existing_external_reference() REQUIRES(!m_external_reference_mutex);

struct OnlyForTesting;

// Expose some internal functionality to other parts of the ObjectStore
// without making it public to everyone
class Internal {
friend class _impl::RealmCoordinator;
friend struct OnlyForTesting;

static void nonsync_transact_notify(SyncSession& session, VersionID::version_type version)
{
Expand All @@ -281,6 +284,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
{
return session.m_db;
}

static util::Future<void> pause_async(SyncSession& session);
};

// Expose some internal functionality to testing code.
Expand Down Expand Up @@ -314,6 +319,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
{
return session.get_subscription_store_base();
}

static util::Future<void> pause_async(SyncSession& session);
};

private:
Expand Down
25 changes: 24 additions & 1 deletion src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,23 @@ bool ClientImpl::wait_for_session_terminations_or_client_stopped()
}


// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
util::Future<void> ClientImpl::notify_session_terminated()
{
auto pf = util::make_promise_future<void>();
post([promise = std::move(pf.promise)](Status status) mutable {
// Includes operation_aborted
if (!status.is_ok()) {
promise.set_error(status);
return;
}

promise.emplace_value();
});

return std::move(pf.future);
}

void ClientImpl::drain_connections_on_loop()
{
post([this](Status status) mutable {
Expand Down Expand Up @@ -1103,8 +1120,10 @@ util::Future<std::string> SessionImpl::send_test_command(std::string body)

get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
// Includes operation_aborted
if (!status.is_ok())
if (!status.is_ok()) {
promise.set_error(status);
return;
}

auto id = ++m_last_pending_test_command_ident;
m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
Expand Down Expand Up @@ -2021,6 +2040,10 @@ bool Client::wait_for_session_terminations_or_client_stopped()
return m_impl->wait_for_session_terminations_or_client_stopped();
}

util::Future<void> Client::notify_session_terminated()
{
return m_impl->notify_session_terminated();
}

bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
port_type& port, std::string& path) const
Expand Down
10 changes: 3 additions & 7 deletions src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ class Client {
Client(Client&&) noexcept;
~Client() noexcept;

/// Run the internal event-loop of the client. At most one thread may
/// execute run() at any given time. The call will not return until somebody
/// calls stop().
void run() noexcept;

/// See run().
///
/// Thread-safe.
void shutdown() noexcept;

Expand Down Expand Up @@ -100,6 +93,9 @@ class Client {
/// by any thread, and by multiple threads concurrently.
bool wait_for_session_terminations_or_client_stopped();

/// Async version of wait_for_session_terminations_or_client_stopped().
util::Future<void> notify_session_terminated();

/// Returns false if the specified URL is invalid.
bool decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
port_type& port, std::string& path) const;
Expand Down
2 changes: 2 additions & 0 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ class ClientImpl {

void cancel_reconnect_delay();
bool wait_for_session_terminations_or_client_stopped();
// Async version of wait_for_session_terminations_or_client_stopped().
util::Future<void> notify_session_terminated();
void voluntary_disconnect_all_connections();

private:
Expand Down
24 changes: 24 additions & 0 deletions test/object-store/sync/session/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,30 @@ TEST_CASE("SyncSession: shutdown_and_wait() API", "[sync][session]") {
}
}

TEST_CASE("SyncSession: internal pause_async API", "[sync][session]") {
TestSyncManager init_sync_manager;
auto app = init_sync_manager.app();
auto user = app->sync_manager()->get_user("close-api-tests-user", ENCODE_FAKE_JWT("fake_refresh_token"),
ENCODE_FAKE_JWT("fake_access_token"), "https://realm.example.org",
dummy_device_id);

auto session = sync_session(
user, "/test-close-for-active", [](auto, auto) {}, SyncSessionStopPolicy::AfterChangesUploaded);
EventLoop::main().run_until([&] {
return sessions_are_active(*session);
});
REQUIRE(sessions_are_active(*session));
auto dbref = SyncSession::OnlyForTesting::get_db(*session);
auto before = dbref.use_count();
auto future = SyncSession::OnlyForTesting::pause_async(*session);
future.get();
auto after = dbref.use_count();
// Check SessionImpl released the sync agent as result of SessionWrapper::finalize() being called.
REQUIRE_NOTHROW(dbref->claim_sync_agent());
// Check DBRef is released in SessionWrapper::finalize().
REQUIRE(after < before);
}

TEST_CASE("SyncSession: update_configuration()", "[sync][session]") {
TestSyncManager init_sync_manager({}, {false});
auto app = init_sync_manager.app();
Expand Down

0 comments on commit 690d78a

Please sign in to comment.