Skip to content

Commit

Permalink
Upload data and migrate schema (#6944)
Browse files Browse the repository at this point in the history
* Main work of sync schema migrations

* misc

* Code review changes
  • Loading branch information
danieltabacaru committed Nov 14, 2023
1 parent 690d78a commit 6114e61
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 25 deletions.
1 change: 1 addition & 0 deletions src/realm/error_codes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ ErrorCategory ErrorCodes::error_categories(Error code)
case SyncServerPermissionsChanged:
case SyncUserMismatch:
case SyncWriteNotAllowed:
case SyncSchemaMigrationError:
return ErrorCategory().set(ErrorCategory::runtime_error).set(ErrorCategory::sync_error);

case SyncConnectFailed:
Expand Down
3 changes: 3 additions & 0 deletions src/realm/error_codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ typedef enum realm_errno {
RLM_ERR_TLS_HANDSHAKE_FAILED = 1042,
RLM_ERR_WRONG_SYNC_TYPE = 1043,
RLM_ERR_SYNC_WRITE_NOT_ALLOWED = 1044,
RLM_ERR_SYNC_SCHEMA_MIGRATION_ERROR = 1045,

RLM_ERR_SYSTEM_ERROR = 1999,

Expand Down Expand Up @@ -262,6 +263,8 @@ typedef enum realm_sync_errno_session {
RLM_SYNC_ERR_SESSION_MIGRATE_TO_FLX = 232,
RLM_SYNC_ERR_SESSION_BAD_PROGRESS = 233,
RLM_SYNC_ERR_SESSION_REVERT_TO_PBS = 234,
RLM_SYNC_ERR_SESSION_BAD_SCHEMA_VERSION = 235,
RLM_SYNC_ERR_SESSION_SCHEMA_VERSION_CHANGED = 236,
// Error code 299 is reserved as an "unknown session error" in tests
} realm_sync_errno_session_e;

Expand Down
1 change: 1 addition & 0 deletions src/realm/error_codes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class ErrorCodes {
TlsHandshakeFailed = RLM_ERR_TLS_HANDSHAKE_FAILED,
WrongSyncType = RLM_ERR_WRONG_SYNC_TYPE,
SyncWriteNotAllowed = RLM_ERR_SYNC_WRITE_NOT_ALLOWED,
SyncSchemaMigrationError = RLM_ERR_SYNC_SCHEMA_MIGRATION_ERROR,

SystemError = RLM_ERR_SYSTEM_ERROR,

Expand Down
7 changes: 7 additions & 0 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,13 @@ void RealmCoordinator::delete_and_reopen()
util::CheckedLockGuard lock(m_realm_mutex);
close();
util::File::remove(m_config.path);
#if REALM_ENABLE_SYNC
// Close the sync session.
if (m_sync_session) {
m_sync_session->force_close();
m_sync_session = nullptr;
}
#endif
open_db();
}

Expand Down
17 changes: 13 additions & 4 deletions src/realm/object-store/shared_realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ bool Realm::schema_change_needs_write_transaction(Schema& schema, std::vector<Sc

switch (m_config.schema_mode) {
case SchemaMode::Automatic:
if (version < m_schema_version && m_schema_version != ObjectStore::NotVersioned)
throw InvalidSchemaVersionException(m_schema_version, version, false);
verify_schema_version_not_decreasing(version);
return true;

case SchemaMode::Immutable:
Expand Down Expand Up @@ -321,8 +320,7 @@ bool Realm::schema_change_needs_write_transaction(Schema& schema, std::vector<Sc
}

case SchemaMode::Manual:
if (version < m_schema_version && m_schema_version != ObjectStore::NotVersioned)
throw InvalidSchemaVersionException(m_schema_version, version, false);
verify_schema_version_not_decreasing(version);
if (version == m_schema_version) {
ObjectStore::verify_no_changes_required(changes);
REALM_UNREACHABLE(); // changes is non-empty so above line always throws
Expand All @@ -332,6 +330,17 @@ bool Realm::schema_change_needs_write_transaction(Schema& schema, std::vector<Sc
REALM_COMPILER_HINT_UNREACHABLE();
}

// Schema version is not allowed to decrease for local realms.
void Realm::verify_schema_version_not_decreasing(uint64_t version)
{
#if REALM_ENABLE_SYNC
if (m_config.sync_config)
return;
#endif
if (version < m_schema_version && m_schema_version != ObjectStore::NotVersioned)
throw InvalidSchemaVersionException(m_schema_version, version, false);
}

Schema Realm::get_full_schema()
{
if (!m_config.immutable())
Expand Down
3 changes: 2 additions & 1 deletion src/realm/object-store/shared_realm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ class Realm : public std::enable_shared_from_this<Realm> {
* will be thrown instead.
*
* If the destination file does not exist, the action performed depends on
* the type of the source and destimation files. If the destination
* the type of the source and destination files. If the destination
* configuration is a non-sync local Realm configuration, a compacted copy
* of the current Transaction's data (which includes uncommitted changes if
* applicable!) is written in streaming form, with no history.
Expand Down Expand Up @@ -550,6 +550,7 @@ class Realm : public std::enable_shared_from_this<Realm> {
void set_schema(Schema const& reference, Schema schema);
bool reset_file(Schema& schema, std::vector<SchemaChange>& changes_required);
bool schema_change_needs_write_transaction(Schema& schema, std::vector<SchemaChange>& changes, uint64_t version);
void verify_schema_version_not_decreasing(uint64_t version);
Schema get_full_schema();

// Ensure that m_schema and m_schema_version match that of the current
Expand Down
111 changes: 95 additions & 16 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ AsyncOpenTask::AsyncOpenTask(std::shared_ptr<_impl::RealmCoordinator> coordinato
{
}

void AsyncOpenTask::start(AsyncOpenCallback async_open_complete)
void AsyncOpenTask::start(AsyncOpenCallback callback)
{
util::CheckedUniqueLock lock(m_mutex);
if (!m_session)
Expand All @@ -43,8 +43,7 @@ void AsyncOpenTask::start(AsyncOpenCallback async_open_complete)
lock.unlock();

std::shared_ptr<AsyncOpenTask> self(shared_from_this());
session->wait_for_download_completion([async_open_complete = std::move(async_open_complete), self,
this](Status status) mutable {
session->wait_for_download_completion([callback = std::move(callback), self, this](Status status) mutable {
std::shared_ptr<_impl::RealmCoordinator> coordinator;
{
util::CheckedLockGuard lock(m_mutex);
Expand All @@ -56,18 +55,19 @@ void AsyncOpenTask::start(AsyncOpenCallback async_open_complete)
}

if (!status.is_ok()) {
self->async_open_complete(std::move(async_open_complete), coordinator, status);
self->async_open_complete(std::move(callback), coordinator, status);
return;
}

auto config = coordinator->get_config();
if (config.sync_config && config.sync_config->flx_sync_requested &&
config.sync_config->subscription_initializer) {
const bool rerun_on_launch = config.sync_config->rerun_init_subscription_on_open;
self->attach_to_subscription_initializer(std::move(async_open_complete), coordinator, rerun_on_launch);
}
else {
self->async_open_complete(std::move(async_open_complete), coordinator, status);
self->migrate_schema_or_complete(std::move(callback), coordinator, status);
});
// The callback does not extend the lifetime of the task if it's never invoked.
SyncSession::Internal::set_sync_schema_migration_callback(*session, [weak_self = weak_from_this(), this]() {
if (auto self = weak_self.lock()) {
util::CheckedLockGuard lock(m_mutex);
if (!m_session)
return;
m_sync_schema_migration_required = true;
}
});
session->revive_if_needed();
Expand Down Expand Up @@ -118,7 +118,7 @@ void AsyncOpenTask::unregister_download_progress_notifier(uint64_t token)
m_session->unregister_progress_notifier(token);
}

void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& async_open_callback,
void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& callback,
std::shared_ptr<_impl::RealmCoordinator> coordinator,
bool rerun_on_launch)
{
Expand All @@ -136,13 +136,13 @@ void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& async
// We need to wait until subscription initializer completes
std::shared_ptr<AsyncOpenTask> self(shared_from_this());
init_subscription.get_state_change_notification(sync::SubscriptionSet::State::Complete)
.get_async([self, coordinator, async_open_callback = std::move(async_open_callback)](
.get_async([self, coordinator, callback = std::move(callback)](
StatusWith<realm::sync::SubscriptionSet::State> state) mutable {
self->async_open_complete(std::move(async_open_callback), coordinator, state.get_status());
self->async_open_complete(std::move(callback), coordinator, state.get_status());
});
}
else {
async_open_complete(std::move(async_open_callback), coordinator, Status::OK());
async_open_complete(std::move(callback), coordinator, Status::OK());
}
}

Expand All @@ -151,6 +151,10 @@ void AsyncOpenTask::async_open_complete(AsyncOpenCallback&& callback,
{
{
util::CheckedLockGuard lock(m_mutex);
// 'Cancel' may have been called just before 'async_open_complete' is invoked.
if (!m_session)
return;

for (auto token : m_registered_callbacks) {
m_session->unregister_progress_notifier(token);
}
Expand All @@ -170,4 +174,79 @@ void AsyncOpenTask::async_open_complete(AsyncOpenCallback&& callback,
return callback({}, std::make_exception_ptr(Exception(status)));
}

void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
std::shared_ptr<_impl::RealmCoordinator> coordinator, Status status)
{
util::CheckedUniqueLock lock(m_mutex);
if (!m_session)
return;
auto session = m_session;
auto migrate_schema = m_sync_schema_migration_required;
lock.unlock();

if (!migrate_schema) {
wait_for_bootstrap_or_complete(std::move(callback), coordinator, status);
return;
}

// Migrate the schema.
// * First upload the changes at the old schema version
// * Then delete the realm, reopen it, and rebootstrap at new schema version
// The lifetime of the task is extended until the bootstrap completes.
std::shared_ptr<AsyncOpenTask> self(shared_from_this());
session->wait_for_upload_completion([callback = std::move(callback), coordinator, session, self,
this](Status status) mutable {
{
util::CheckedLockGuard lock(m_mutex);
if (!m_session)
return; // Swallow all events if the task has been cancelled.
}

if (!status.is_ok()) {
self->async_open_complete(std::move(callback), coordinator, status);
return;
}

auto future = SyncSession::Internal::pause_async(*session);
// Wait until the SessionWrapper is done using the DBRef.
std::move(future).get_async([callback = std::move(callback), coordinator, self, this](Status status) mutable {
{
util::CheckedLockGuard lock(m_mutex);
if (!m_session)
return; // Swallow all events if the task has been cancelled.
}

if (!status.is_ok()) {
self->async_open_complete(std::move(callback), coordinator, status);
return;
}

// Delete the realm file and reopen it.
{
util::CheckedLockGuard lock(m_mutex);
m_session = nullptr;
coordinator->delete_and_reopen();
m_session = coordinator->sync_session();
}

self->wait_for_bootstrap_or_complete(std::move(callback), coordinator, status);
});
});
}

void AsyncOpenTask::wait_for_bootstrap_or_complete(AsyncOpenCallback&& callback,
std::shared_ptr<_impl::RealmCoordinator> coordinator,
Status status)
{
auto config = coordinator->get_config();
if (config.sync_config && config.sync_config->flx_sync_requested &&
config.sync_config->subscription_initializer) {
const bool rerun_on_launch = config.sync_config->rerun_init_subscription_on_open;
attach_to_subscription_initializer(std::move(callback), coordinator, rerun_on_launch);
}
else {
async_open_complete(std::move(callback), coordinator, status);
}
}

} // namespace realm
9 changes: 7 additions & 2 deletions src/realm/object-store/sync/async_open_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class AsyncOpenTask : public std::enable_shared_from_this<AsyncOpenTask> {
//
// If multiple AsyncOpenTasks all attempt to download the same Realm and one of them is canceled,
// the other tasks will receive a "Cancelled" exception.
void start(AsyncOpenCallback async_open_complete) REQUIRES(!m_mutex);
void start(AsyncOpenCallback callback) REQUIRES(!m_mutex);

// Cancels the download and stops the session. No further functions should be called on this class.
void cancel() REQUIRES(!m_mutex);
Expand All @@ -62,12 +62,17 @@ class AsyncOpenTask : public std::enable_shared_from_this<AsyncOpenTask> {
REQUIRES(!m_mutex);
void attach_to_subscription_initializer(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, bool)
REQUIRES(!m_mutex);
void migrate_schema_or_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, Status)
REQUIRES(!m_mutex);
void wait_for_bootstrap_or_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, Status)
REQUIRES(!m_mutex);

std::shared_ptr<_impl::RealmCoordinator> m_coordinator GUARDED_BY(m_mutex);
std::shared_ptr<SyncSession> m_session GUARDED_BY(m_mutex);
std::vector<uint64_t> m_registered_callbacks GUARDED_BY(m_mutex);
mutable util::CheckedMutex m_mutex;
bool m_db_first_open{true};
const bool m_db_first_open;
bool m_sync_schema_migration_required GUARDED_BY(m_mutex) = false;
};

} // namespace realm
Expand Down
17 changes: 17 additions & 0 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,16 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
next_state = NextStateAfterError::inactive;
log_out_user = true;
break;
case sync::ProtocolErrorInfo::Action::MigrateSchema:
std::function<void()> callback;
{
util::CheckedLockGuard l(m_state_mutex);
callback = std::move(m_sync_schema_migration_callback);
}
if (callback) {
callback();
}
return; // do not propgate the error to the user at this point
}
}
else {
Expand Down Expand Up @@ -901,6 +911,7 @@ void SyncSession::create_sync_session()
session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;
session_config.session_reason =
client_reset::is_fresh_path(m_config.path) ? sync::SessionReason::ClientReset : sync::SessionReason::Sync;
session_config.schema_version = m_config.schema_version;

if (sync_config.on_sync_client_event_hook) {
session_config.on_sync_client_event_hook = [hook = sync_config.on_sync_client_event_hook,
Expand Down Expand Up @@ -989,6 +1000,12 @@ void SyncSession::create_sync_session()
});
}

void SyncSession::set_sync_schema_migration_callback(std::function<void()>&& callback)
{
util::CheckedLockGuard l(m_state_mutex);
m_sync_schema_migration_callback = std::move(callback);
}

void SyncSession::nonsync_transact_notify(sync::version_type version)
{
m_progress_notifier.set_local_version(version);
Expand Down
10 changes: 10 additions & 0 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
class Internal {
friend class _impl::RealmCoordinator;
friend struct OnlyForTesting;
friend class AsyncOpenTask;

static void nonsync_transact_notify(SyncSession& session, VersionID::version_type version)
{
Expand All @@ -285,6 +286,11 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
return session.m_db;
}

static void set_sync_schema_migration_callback(SyncSession& session, std::function<void()>&& callback)
{
session.set_sync_schema_migration_callback(std::move(callback));
}

static util::Future<void> pause_async(SyncSession& session);
};

Expand Down Expand Up @@ -397,6 +403,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

void nonsync_transact_notify(VersionID::version_type) REQUIRES(!m_state_mutex);

void set_sync_schema_migration_callback(std::function<void()>&&) REQUIRES(!m_state_mutex);

void create_sync_session() REQUIRES(m_state_mutex, !m_config_mutex);
void did_drop_external_reference()
REQUIRES(!m_state_mutex, !m_config_mutex, !m_external_reference_mutex, !m_connection_state_mutex);
Expand Down Expand Up @@ -431,6 +439,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

std::function<TransactionCallback> m_sync_transact_callback GUARDED_BY(m_state_mutex);

std::function<void()> m_sync_schema_migration_callback GUARDED_BY(m_state_mutex);

template <typename Field>
auto config(Field f) REQUIRES(!m_config_mutex)
{
Expand Down
11 changes: 11 additions & 0 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,17 @@ Status Session::receive_error_message(const ProtocolErrorInfo& info)
return Status::OK();
}

if (protocol_error == ProtocolError::schema_version_changed) {
// Enable upload immediately if the session is still active.
if (m_state == Active) {
m_allow_upload = true;
// Notify SyncSession a schema migration is required.
on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
}
// Keep the session active to upload any unsynced changes.
return Status::OK();
}

m_error_message_received = true;
suspend(SessionErrorInfo{info, std::move(status)});
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions src/realm/sync/noinst/protocol_codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ class ClientProtocol {
{"RefreshUser", action::RefreshUser},
{"RefreshLocation", action::RefreshLocation},
{"LogOutUser", action::LogOutUser},
{"MigrateSchema", action::MigrateSchema},
};

if (auto action_it = mapping.find(action_string); action_it != mapping.end()) {
Expand Down
Loading

0 comments on commit 6114e61

Please sign in to comment.