Skip to content

Commit

Permalink
Disconnect old websockets and avoid duplicating ws during hot reload
Browse files Browse the repository at this point in the history
  • Loading branch information
masenf committed Dec 20, 2024
1 parent 77fe285 commit 973e114
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 8 deletions.
10 changes: 9 additions & 1 deletion reflex/.templates/web/utils/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ export const connect = async (
// Get backend URL object from the endpoint.
const endpoint = getBackendURL(EVENTURL);

// Disconnect old socket
if (socket.current && socket.current.connected) {
socket.current.disconnect();
}

// Create the socket.
socket.current = io(endpoint.href, {
path: endpoint["pathname"],
Expand Down Expand Up @@ -429,6 +434,7 @@ export const connect = async (
socket.current.on("connect", () => {
setConnectErrors([]);
window.addEventListener("pagehide", pagehideHandler);
document.addEventListener("visibilitychange", checkVisibility);
});

socket.current.on("connect_error", (error) => {
Expand All @@ -438,7 +444,10 @@ export const connect = async (
// When the socket disconnects reset the event_processing flag
socket.current.on("disconnect", () => {
event_processing = false;
socket.current.io.skipReconnect = true;
socket.current = null;
window.removeEventListener("pagehide", pagehideHandler);
document.removeEventListener("visibilitychange", checkVisibility);
});

// On each received message, queue the updates and events.
Expand All @@ -457,7 +466,6 @@ export const connect = async (
queueEvents([...initialEvents(), event], socket);
});

document.addEventListener("visibilitychange", checkVisibility);
};

/**
Expand Down
5 changes: 5 additions & 0 deletions reflex/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from typing_extensions import Annotated, get_type_hints

from reflex.utils.console import set_log_level
from reflex.utils.exceptions import ConfigError, EnvironmentVarValueError
from reflex.utils.types import GenericType, is_union, value_inside_optional

Expand Down Expand Up @@ -599,6 +600,7 @@ class Config(Base):
class Config:
"""Pydantic config for the config."""

use_enum_values = False
validate_assignment = True

# The name of the app (should match the name of the app directory).
Expand Down Expand Up @@ -718,6 +720,9 @@ def __init__(self, *args, **kwargs):
self._non_default_attributes.update(kwargs)
self._replace_defaults(**kwargs)

# Set the log level for this process
set_log_level(self.loglevel)

if (
self.state_manager_mode == constants.StateManagerMode.REDIS
and not self.redis_url
Expand Down
24 changes: 18 additions & 6 deletions reflex/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from typing import Any, AsyncGenerator
from urllib.parse import urlparse

import aiohttp
Expand Down Expand Up @@ -34,32 +34,44 @@ async def proxy_middleware(*args, **kwargs) -> AsyncGenerator[None, None]:
"""
yield
else:
MAX_PROXY_RETRY = 25

async def proxy_http_with_retry(
*,
context: ProxyContext,
scope: Scope,
receive: Receive,
send: Send,
) -> None:
) -> Any:
"""Proxy an HTTP request with retries.
Args:
context: The proxy context.
scope: The ASGI scope.
scope: The request scope.
receive: The receive channel.
send: The send channel.
Returns:
The response from `proxy_http`.
"""
for _attempt in range(100):
for _attempt in range(MAX_PROXY_RETRY):
try:
return await proxy_http(
context=context, scope=scope, receive=receive, send=send
context=context,
scope=scope,
receive=receive,
send=send,
)
except aiohttp.client_exceptions.ClientError as err: # noqa: PERF203
except aiohttp.ClientError as err: # noqa: PERF203
console.debug(
f"Retrying request {scope['path']} due to client error {err!r}."
)
await asyncio.sleep(0.3)
except Exception as ex:
console.debug(
f"Retrying request {scope['path']} due to unhandled exception {ex!r}."
)
await asyncio.sleep(0.3)

def _get_proxy_app_with_context(frontend_host: str) -> tuple[ProxyContext, ASGIApp]:
"""Get the proxy app with the given frontend host.
Expand Down
7 changes: 6 additions & 1 deletion reflex/utils/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import os

from rich.console import Console
from rich.progress import MofNCompleteColumn, Progress, TimeElapsedColumn
from rich.prompt import Prompt
Expand All @@ -12,7 +14,7 @@
_console = Console()

# The current log level.
_LOG_LEVEL = LogLevel.INFO
_LOG_LEVEL = LogLevel.DEFAULT

# Deprecated features who's warning has been printed.
_EMITTED_DEPRECATION_WARNINGS = set()
Expand Down Expand Up @@ -61,6 +63,9 @@ def set_log_level(log_level: LogLevel):
raise ValueError(f"Invalid log level: {log_level}") from ae

global _LOG_LEVEL
if log_level != _LOG_LEVEL:
# Set the loglevel persistently for subprocesses
os.environ["LOGLEVEL"] = log_level.value
_LOG_LEVEL = log_level


Expand Down

0 comments on commit 973e114

Please sign in to comment.