Skip to content

Commit

Permalink
Handle websocket errors entirely within sync client (#6859)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbreams authored Aug 8, 2023
1 parent 8240223 commit a80ab11
Show file tree
Hide file tree
Showing 24 changed files with 341 additions and 375 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* Fix failed assertion for unknown app server errors ([#6758](https://github.com/realm/realm-core/issues/6758), since v12.9.0).

### Breaking changes
* None.
* The `WebSocketObserver` interface in the sync `SocketProvider` API now takes a `WebSocketError` enum/`std::string_view` for the `websocket_closed_handler()` instead of a `Status`. Implementers of platform networking should make sure all their error handling is expressed in terms of the WebSocketError enum. ([PR #6859](https://github.com/realm/realm-core/pull/6859))

### Compatibility
* Fileformat: Generates files with format v23. Reads and automatically upgrade from fileformat v5.
Expand Down
4 changes: 2 additions & 2 deletions src/realm.h
Original file line number Diff line number Diff line change
Expand Up @@ -4106,8 +4106,8 @@ RLM_API realm_sync_socket_t* realm_sync_socket_new(
realm_sync_socket_websocket_async_write_func_t websocket_write_func,
realm_sync_socket_websocket_free_func_t websocket_free_func);

RLM_API void realm_sync_socket_callback_complete(realm_sync_socket_callback_t* realm_callback,
realm_web_socket_errno_e status, const char* reason);
RLM_API void realm_sync_socket_callback_complete(realm_sync_socket_callback_t* realm_callback, realm_errno_e status,
const char* reason);

RLM_API void realm_sync_socket_websocket_connected(realm_websocket_observer_t* realm_websocket_observer,
const char* protocol);
Expand Down
21 changes: 8 additions & 13 deletions src/realm/object-store/c_api/socket_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ struct CAPIWebSocketObserver : sync::WebSocketObserver {
return m_observer->websocket_binary_message_received(data);
}

bool websocket_closed_handler(bool was_clean, Status status) final
bool websocket_closed_handler(bool was_clean, sync::websocket::WebSocketError code, std::string_view msg) final
{
return m_observer->websocket_closed_handler(was_clean, status);
return m_observer->websocket_closed_handler(was_clean, code, msg);
}

private:
Expand Down Expand Up @@ -218,13 +218,11 @@ RLM_API realm_sync_socket_t* realm_sync_socket_new(
});
}

RLM_API void realm_sync_socket_callback_complete(realm_sync_socket_callback* realm_callback,
realm_web_socket_errno_e code, const char* reason)
RLM_API void realm_sync_socket_callback_complete(realm_sync_socket_callback* realm_callback, realm_errno_e code,
const char* reason)
{
auto status = sync::websocket::WebSocketError(code);
auto complete_status = code == realm_web_socket_errno_e::RLM_ERR_WEBSOCKET_OK
? Status::OK()
: Status{sync::websocket::make_error_code(status), reason};
auto complete_status =
code == realm_errno_e::RLM_ERR_NONE ? Status::OK() : Status{static_cast<ErrorCodes::Error>(code), reason};
(*(realm_callback->get()))(complete_status);
realm_release(realm_callback);
}
Expand All @@ -249,11 +247,8 @@ RLM_API void realm_sync_socket_websocket_message(realm_websocket_observer_t* rea
RLM_API void realm_sync_socket_websocket_closed(realm_websocket_observer_t* realm_websocket_observer, bool was_clean,
realm_web_socket_errno_e code, const char* reason)
{
auto status = sync::websocket::WebSocketError(code);
auto closed_status = code == realm_web_socket_errno_e::RLM_ERR_WEBSOCKET_OK
? Status::OK()
: Status{sync::websocket::make_error_code(status), reason};
realm_websocket_observer->get()->websocket_closed_handler(was_clean, closed_status);
realm_websocket_observer->get()->websocket_closed_handler(
was_clean, static_cast<sync::websocket::WebSocketError>(code), reason);
}

RLM_API void realm_sync_client_config_set_sync_socket(realm_sync_client_config_t* config,
Expand Down
8 changes: 1 addition & 7 deletions src/realm/object-store/c_api/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,6 @@ realm_sync_error_code_t to_capi(const Status& status, std::string& message)
else if (category == std::system_category() || category == realm::util::error::basic_system_error_category()) {
ret.category = RLM_SYNC_ERROR_CATEGORY_SYSTEM;
}
else if (category == realm::sync::websocket::websocket_error_category()) {
ret.category = RLM_SYNC_ERROR_CATEGORY_WEBSOCKET;
}
else {
ret.category = RLM_SYNC_ERROR_CATEGORY_UNKNOWN;
}
Expand Down Expand Up @@ -178,9 +175,6 @@ void sync_error_to_error_code(const realm_sync_error_code_t& sync_error_code, st
else if (category == RLM_SYNC_ERROR_CATEGORY_SYSTEM) {
error_code_out->assign(sync_error_code.value, std::system_category());
}
else if (category == RLM_SYNC_ERROR_CATEGORY_WEBSOCKET) {
error_code_out->assign(sync_error_code.value, realm::sync::websocket::websocket_error_category());
}
else if (category == RLM_SYNC_ERROR_CATEGORY_UNKNOWN) {
error_code_out->assign(sync_error_code.value, realm::util::error::basic_system_error_category());
}
Expand Down Expand Up @@ -902,7 +896,7 @@ RLM_API void realm_sync_session_handle_error_for_testing(const realm_sync_sessio
std::error_code err;
sync_error_to_error_code(sync_error, &err);
SyncSession::OnlyForTesting::handle_error(*session->get(),
sync::SessionErrorInfo{Status{err, error_message}, !is_fatal});
sync::SessionErrorInfo{Status{err, error_message}, IsFatal{is_fatal}});
}

} // namespace realm::c_api
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ struct EmscriptenWebSocket final : public WebSocketInterface {
{
auto observer = reinterpret_cast<WebSocketObserver*>(user_data);
REALM_ASSERT(event->code >= 1000 && event->code < 5000);
auto status = event->code == 1000 ? Status::OK() : Status(ErrorCodes::Error(event->code), event->reason);
observer->websocket_closed_handler(event->wasClean, std::move(status));
observer->websocket_closed_handler(event->wasClean, static_cast<websocket::WebSocketError>(event->code),
event->reason);
return EM_TRUE;
}

Expand Down
52 changes: 16 additions & 36 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,10 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
}
lock.unlock();

const bool try_again = false;
sync::SessionErrorInfo synthetic(
Status{ErrorCodes::AutoClientResetFailed,
util::format("A fatal error occurred during client reset: '%1'", status.reason())},
try_again);
sync::IsFatal{true});
handle_error(synthetic);
return;
}
Expand Down Expand Up @@ -633,7 +632,7 @@ void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::Sessi
void SyncSession::handle_error(sync::SessionErrorInfo error)
{
enum class NextStateAfterError { none, inactive, error };
auto next_state = error.is_fatal() ? NextStateAfterError::error : NextStateAfterError::none;
auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
auto error_code = error.status.get_std_error_code();
util::Optional<ShouldBackup> delete_file;
bool log_out_user = false;
Expand Down Expand Up @@ -708,54 +707,35 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
save_sync_config_after_migration_or_rollback();
download_fresh_realm(error.server_requests_action);
return;
case sync::ProtocolErrorInfo::Action::RefreshUser:
if (auto u = user()) {
u->refresh_custom_data(false, handle_refresh(shared_from_this(), false));
return;
}
break;
case sync::ProtocolErrorInfo::Action::RefreshLocation:
if (auto u = user()) {
u->refresh_custom_data(true, handle_refresh(shared_from_this(), true));
return;
}
break;
}
}
else if (error_code.category() == sync::websocket::websocket_error_category()) {
using WebSocketError = sync::websocket::WebSocketError;
auto websocket_error = static_cast<WebSocketError>(error_code.value());

// The server replies with '401: unauthorized' if the access token is invalid, expired, revoked, or the user
// is disabled. In this scenario we attempt an automatic token refresh and if that succeeds continue as
// normal. If the refresh request also fails with 401 then we need to stop retrying and pass along the error;
// see handle_refresh().
bool redirect_occurred = websocket_error == WebSocketError::websocket_moved_permanently;
if (redirect_occurred || websocket_error == WebSocketError::websocket_unauthorized ||
websocket_error == WebSocketError::websocket_abnormal_closure) {
if (auto u = user()) {
// If a redirection occurred, the location metadata will be updated before refreshing the access
// token.
u->refresh_custom_data(redirect_occurred, handle_refresh(shared_from_this(), redirect_occurred));
return;
}
}

// If the websocket was closed cleanly or if the socket disappeared, don't notify the user as an error
// since the sync client will retry.
if (websocket_error == WebSocketError::websocket_read_error ||
websocket_error == WebSocketError::websocket_write_error) {
return;
}

// Surface a simplified websocket error to the user.
auto simplified_error = sync::websocket::get_simplified_websocket_error(websocket_error);
std::error_code new_error_code(simplified_error, sync::websocket::websocket_error_category());
error = sync::SessionErrorInfo(Status{new_error_code, error.message}, error.try_again);
}
else {
// Unrecognized error code.
unrecognized_by_client = true;
}

util::CheckedUniqueLock lock(m_state_mutex);
SyncError sync_error{error.status, error.is_fatal(), error.log_url, std::move(error.compensating_writes)};
SyncError sync_error{error.status, error.is_fatal, error.log_url, std::move(error.compensating_writes)};
// `action` is used over `shouldClientReset` and `isRecoveryModeDisabled`.
sync_error.server_requests_action = error.server_requests_action;
sync_error.is_unrecognized_by_client = unrecognized_by_client;

if (delete_file)
update_error_and_mark_file_for_deletion(sync_error, *delete_file);

if (m_state == State::Dying && error.is_fatal()) {
if (m_state == State::Dying && error.is_fatal) {
become_inactive(std::move(lock), error.status);
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data
auto action = m_wrapper.m_debug_hook(data);
switch (action) {
case realm::SyncClientHookAction::SuspendWithRetryableError: {
SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, true);
SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;

auto err_processing_err = receive_error_message(err_info);
Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/client_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ struct SessionErrorInfo : public ProtocolErrorInfo {
{
}

SessionErrorInfo(Status status, bool try_again)
: ProtocolErrorInfo(status.get_std_error_code().value(), status.reason(), try_again)
SessionErrorInfo(Status status, IsFatal is_fatal)
: ProtocolErrorInfo(status.get_std_error_code().value(), status.reason(), is_fatal)
, status(std::move(status))
{
}
Expand Down
49 changes: 20 additions & 29 deletions src/realm/sync/network/default_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,13 @@ class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
{
m_logger.error("Reading failed: %1", ec.message()); // Throws
constexpr bool was_clean = false;
websocket_error_and_close_handler(
was_clean, Status{make_error_code(WebSocketError::websocket_read_error), ec.message()});
websocket_error_and_close_handler(was_clean, WebSocketError::websocket_read_error, ec.message());
}
void websocket_write_error_handler(std::error_code ec) override
{
m_logger.error("Writing failed: %1", ec.message()); // Throws
constexpr bool was_clean = false;
websocket_error_and_close_handler(
was_clean, Status{make_error_code(WebSocketError::websocket_write_error), ec.message()});
websocket_error_and_close_handler(was_clean, WebSocketError::websocket_write_error, ec.message());
}
void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*,
const std::string_view* body) override
Expand Down Expand Up @@ -144,30 +142,25 @@ class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
}
}

websocket_error_and_close_handler(was_clean, Status{make_error_code(error), ec.message()});
websocket_error_and_close_handler(was_clean, error, ec.message());
}
void websocket_protocol_error_handler(std::error_code ec) override
{
constexpr bool was_clean = false;
websocket_error_and_close_handler(
was_clean, Status{make_error_code(WebSocketError::websocket_protocol_error), ec.message()});
websocket_error_and_close_handler(was_clean, WebSocketError::websocket_protocol_error, ec.message());
}
bool websocket_close_message_received(std::error_code ec, StringData message) override
bool websocket_close_message_received(WebSocketError code, std::string_view message) override
{
constexpr bool was_clean = true;

// Normal closure.
if (ec.value() == 1000) {
return websocket_error_and_close_handler(was_clean, Status::OK());
}
return websocket_error_and_close_handler(was_clean, Status{ec, message});
return websocket_error_and_close_handler(was_clean, code, message);
}
bool websocket_error_and_close_handler(bool was_clean, Status status)
bool websocket_error_and_close_handler(bool was_clean, WebSocketError code, std::string_view reason)
{
if (!was_clean) {
m_observer->websocket_error_handler();
}
return m_observer->websocket_closed_handler(was_clean, status);
return m_observer->websocket_closed_handler(was_clean, code, reason);
}
bool websocket_binary_message_received(const char* ptr, std::size_t size) override
{
Expand Down Expand Up @@ -275,8 +268,8 @@ void DefaultWebSocketImpl::handle_resolve(std::error_code ec, network::Endpoint:
if (ec) {
m_logger.error("Failed to resolve '%1:%2': %3", m_endpoint.address, m_endpoint.port, ec.message()); // Throws
constexpr bool was_clean = false;
websocket_error_and_close_handler(
was_clean, Status{make_error_code(WebSocketError::websocket_resolve_failed), ec.message()}); // Throws
websocket_error_and_close_handler(was_clean, WebSocketError::websocket_resolve_failed,
ec.message()); // Throws
return;
}

Expand Down Expand Up @@ -316,8 +309,8 @@ void DefaultWebSocketImpl::handle_tcp_connect(std::error_code ec, network::Endpo
// All endpoints failed
m_logger.error("Failed to connect to '%1:%2': All endpoints failed", m_endpoint.address, m_endpoint.port);
constexpr bool was_clean = false;
websocket_error_and_close_handler(
was_clean, Status{make_error_code(WebSocketError::websocket_connection_failed), ec.message()}); // Throws
websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
ec.message()); // Throws
return;
}

Expand Down Expand Up @@ -357,18 +350,16 @@ void DefaultWebSocketImpl::initiate_http_tunnel()
if (ec && ec != util::error::operation_aborted) {
m_logger.error("Failed to establish HTTP tunnel: %1", ec.message());
constexpr bool was_clean = false;
websocket_error_and_close_handler(
was_clean,
Status{make_error_code(WebSocketError::websocket_connection_failed), ec.message()}); // Throws
websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
ec.message()); // Throws
return;
}

if (response.status != HTTPStatus::Ok) {
m_logger.error("Proxy server returned response '%1 %2'", response.status, response.reason); // Throws
constexpr bool was_clean = false;
websocket_error_and_close_handler(
was_clean,
Status{make_error_code(WebSocketError::websocket_connection_failed), response.reason}); // Throws
websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
response.reason); // Throws
return;
}

Expand Down Expand Up @@ -431,15 +422,15 @@ void DefaultWebSocketImpl::handle_ssl_handshake(std::error_code ec)
if (ec) {
REALM_ASSERT(ec != util::error::operation_aborted);
constexpr bool was_clean = false;
std::error_code ec2;
WebSocketError parsed_error_code;
if (ec == network::ssl::Errors::certificate_rejected) {
ec2 = make_error_code(WebSocketError::websocket_tls_handshake_failed);
parsed_error_code = WebSocketError::websocket_tls_handshake_failed;
}
else {
ec2 = make_error_code(WebSocketError::websocket_connection_failed);
parsed_error_code = WebSocketError::websocket_connection_failed;
}

websocket_error_and_close_handler(was_clean, Status{ec2, ec.message()}); // Throws
websocket_error_and_close_handler(was_clean, parsed_error_code, ec.message()); // Throws
return;
}

Expand Down
Loading

0 comments on commit a80ab11

Please sign in to comment.