Skip to content

Commit

Permalink
Improve websocket enable/disable functionality (#57)
Browse files Browse the repository at this point in the history
* Improve websocket enable/disable functionality

* Add websocket_enabled decorator to use for endpoints

- wrapped websocket manager will return an empty function as attr when websockets are disabled.

* Move empty handler outside websocket_enabled decorator
  • Loading branch information
tjeerddie authored Nov 8, 2021
1 parent 545067b commit d93ed71
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 64 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export WEBSOCKET_BROADCASTER_URL="redis://localhost:6379"

Websockets can also be turned off with:
```shell
export WEBSOCKETS_ON=False
export ENABLE_WEBSOCKETS=False
```

more broadcaster info [here](https://pypi.org/project/broadcaster/)
Expand Down
8 changes: 3 additions & 5 deletions orchestrator/api/api_v1/endpoints/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from orchestrator.types import JSON
from orchestrator.utils.json import json_dumps
from orchestrator.utils.show_process import show_process
from orchestrator.websocket import WS_CHANNELS, is_process_active, websocket_manager
from orchestrator.websocket import WS_CHANNELS, is_process_active, websocket_enabled, websocket_manager
from orchestrator.workflow import ProcessStatus

router = APIRouter()
Expand Down Expand Up @@ -345,9 +345,8 @@ def processes_filterable(


@router.websocket("/all/")
@websocket_enabled
async def websocket_process_list(websocket: WebSocket, token: str = Query(...)) -> None:
if not websocket_manager.on:
return
error = await websocket_manager.authorize(websocket, token)

await websocket.accept()
Expand All @@ -362,9 +361,8 @@ async def websocket_process_list(websocket: WebSocket, token: str = Query(...))


@router.websocket("/{pid}")
@websocket_enabled
async def websocket_process_detail(websocket: WebSocket, pid: UUID, token: str = Query(...)) -> None:
if not websocket_manager.on:
return
error = await websocket_manager.authorize(websocket, token)

await websocket.accept()
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/services/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _db_log_step(stat: ProcessStat, step: Step, process_state: WFProcess) -> WFP
db.session.rollback()
raise

if websocket_manager.on:
if websocket_manager.enabled:
websocket_data = create_process_step_websocket_data(p, current_step, step.form)
send_process_step_data_to_websocket(p.pid, websocket_data)

Expand Down
2 changes: 1 addition & 1 deletion orchestrator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class AppSettings(BaseSettings):
TRACING_ENABLED: bool = False
TRANSLATIONS_DIR: Optional[Path] = None
WEBSOCKET_BROADCASTER_URL: str = "memory://"
WEBSOCKETS_ON: bool = True
ENABLE_WEBSOCKETS: bool = True


class Oauth2Settings(BaseSettings):
Expand Down
40 changes: 29 additions & 11 deletions orchestrator/websocket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

from asyncio import new_event_loop
from functools import wraps
from typing import Any, Dict, Optional, cast
from urllib.parse import urlparse
from uuid import UUID
Expand Down Expand Up @@ -40,20 +41,17 @@ def SINGLE_PROCESS(pid: UUID) -> str:
return f"process_detail:{pid}"


async def empty_fn(*args: tuple, **kwargs: dict[str, Any]) -> None:
return


class WrappedWebSocketManager:
def __init__(self, wrappee: Optional[WebSocketManager] = None) -> None:
self.wrapped_websocket_manager = wrappee

def update(self, wrappee: WebSocketManager) -> None:
self.wrapped_websocket_manager = wrappee
if self.wrapped_websocket_manager.on:
logger.warning(
"WebSocketManager object configured, all methods referencing `websocket_manager` should work."
)
else:
logger.warning(
"WebSockets are turned off, WebSocketManager object configured with all methods referencing `websocket_manager` only logging its turned off"
)
logger.warning("WebSocketManager object configured, all methods referencing `websocket_manager` should work.")

def __getattr__(self, attr: str) -> Any:
if not isinstance(self.wrapped_websocket_manager, WebSocketManager):
Expand All @@ -63,6 +61,9 @@ def __getattr__(self, attr: str) -> Any:
raise RuntimeWarning(
"No WebSocketManager configured at this time. Please pass WebSocketManager configuration to OrchestratorCore base_settings"
)
if attr != "enabled" and not self.wrapped_websocket_manager.enabled:
logger.warning("Websockets are disabled, unable to access class methods")
return empty_fn

return getattr(self.wrapped_websocket_manager, attr)

Expand All @@ -74,7 +75,7 @@ def __getattr__(self, attr: str) -> Any:

# The Global WebSocketManager is set after calling this function
def init_websocket_manager(settings: AppSettings) -> WebSocketManager:
wrapped_websocket_manager.update(WebSocketManager(settings.WEBSOCKETS_ON, settings.WEBSOCKET_BROADCASTER_URL))
wrapped_websocket_manager.update(WebSocketManager(settings.ENABLE_WEBSOCKETS, settings.WEBSOCKET_BROADCASTER_URL))
return websocket_manager


Expand Down Expand Up @@ -115,11 +116,28 @@ def send_process_step_data_to_websocket(pid: UUID, data: Dict) -> None:
pass


async def empty_handler() -> None:
return


def websocket_enabled(handler: Any) -> Any:
@wraps(handler)
@wraps(empty_handler)
async def wrapper(*args: tuple, **kwargs: dict[str, Any]) -> Any:
if websocket_manager.enabled:
return await handler(*args, **kwargs)
else:
return await empty_handler()

return wrapper


__all__ = [
"websocket_manager",
"init_websocket_manager",
"create_websocket_data",
"send_process_step_data_to_websocket",
"create_process_step_websocket_data",
"is_process_active",
"send_process_step_data_to_websocket",
"websocket_enabled",
"WS_CHANNELS",
]
39 changes: 0 additions & 39 deletions orchestrator/websocket/managers/websocket_manager_off.py

This file was deleted.

9 changes: 3 additions & 6 deletions orchestrator/websocket/websocket_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,18 @@
from orchestrator.security import oidc_user, opa_security_default
from orchestrator.websocket.managers.broadcast_websocket_manager import BroadcastWebsocketManager
from orchestrator.websocket.managers.memory_websocket_manager import MemoryWebsocketManager
from orchestrator.websocket.managers.websocket_manager_off import WebsocketManagerOff

logger = get_logger(__name__)


class WebSocketManager:
_backend: Union[MemoryWebsocketManager, BroadcastWebsocketManager]

def __init__(self, websockets_on: bool, broadcast_url: str):
self.on = websockets_on
def __init__(self, websockets_enabled: bool, broadcast_url: str):
self.enabled = websockets_enabled
self.broadcaster_type = urlparse(broadcast_url).scheme
self.connected = False
if not self.on:
self._backend = WebsocketManagerOff()
elif self.broadcaster_type == "redis":
if self.broadcaster_type == "redis":
self._backend = BroadcastWebsocketManager(broadcast_url)
else:
self._backend = MemoryWebsocketManager()
Expand Down

0 comments on commit d93ed71

Please sign in to comment.