Skip to content

Commit 5b55417

Browse files
committed
add basic websocket server for notifications
1 parent 8999861 commit 5b55417

File tree

14 files changed

+684
-1
lines changed

14 files changed

+684
-1
lines changed

.github/workflows/docker-images.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ jobs:
1818
registry: ghcr.io
1919
username: ${{ github.actor }}
2020
password: ${{ secrets.GITHUB_TOKEN }}
21+
2122
- name: "Tasks API: Build and push Docker image"
2223
uses: docker/build-push-action@v6
2324
with:
@@ -26,6 +27,7 @@ jobs:
2627
tags: |
2728
ghcr.io/${{ github.repository }}/tasks:latest
2829
ghcr.io/${{ github.repository }}/tasks:${{ github.sha }}
30+
2931
- name: "Frontend: Build and push Docker image"
3032
uses: docker/build-push-action@v6
3133
with:
@@ -34,3 +36,12 @@ jobs:
3436
tags: |
3537
ghcr.io/${{ github.repository }}/frontend:latest
3638
ghcr.io/${{ github.repository }}/frontend:${{ github.sha }}
39+
40+
- name: "Websocket Server: Build and push Docker image"
41+
uses: docker/build-push-action@v6
42+
with:
43+
push: ${{ github.event_name != 'pull_request' }}
44+
context: backend/websocket-server
45+
tags: |
46+
ghcr.io/${{ github.repository }}/websocket-server:latest
47+
ghcr.io/${{ github.repository }}/websocket-server:${{ github.sha }}

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ and deploy a modern web application with several different services in the backg
88
This is an overview. You will find more detailed explanations for the components in their own Readme. The components are intentionally written in different languages and frameworks. First, we wanted to showcase the interactions between different kinds of applications, and second, we wanted to learn and have fun.
99

1010
- **[Frontend](./frontend/)** A simple Svelte client application.
11-
- **[Tasks API](./backend/tasks/)** The API for interacting with tasks, written in Go, using the Fiber framework.
11+
- **[Tasks API](./backend/tasks/)** The API for interacting with tasks, written in Go, using the Fiber framework. Changes to tasks are published as events to Redis.
12+
- **[WebsocketServer](./backend/websocket-server/)** Listens for events in Redis and broadcasts them to all connected Websocket clients. The frontend connects to it by default.
1213

1314
## Services
1415

@@ -49,11 +50,13 @@ graph LR;
4950
subgraph Backend
5051
TasksAPI-- :27017 --> MongoDB
5152
TasksAPI-- :6379 --> Redis
53+
WebsocketServer-- :6379 -->Redis
5254
end
5355
5456
subgraph lb[Load Balancer]
5557
Traeffik-- :5173 --> Frontend
5658
Traeffik-- :3000 --> TasksAPI
59+
Traeffik-- :8765 --> WebsocketServer
5760
end
5861
5962

apps.compose.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,19 @@ services:
4141
- "traefik.http.services.tasks.loadbalancer.server.port=3000"
4242
networks:
4343
- internal
44+
45+
websocket-server:
46+
build:
47+
context: backend/websocket-server
48+
environment:
49+
- REDIS_URL=redis://redis:6379/0
50+
- WEBSOCKET_SERVER_HOST=0.0.0.0
51+
labels:
52+
- "traefik.enable=true"
53+
- "traefik.http.routers.websocket-server.rule=PathPrefix(`/ws`)"
54+
- "traefik.http.routers.websocket-server.entrypoints=web"
55+
- "traefik.http.services.websocket-server.loadbalancer.server.port=8765"
56+
volumes:
57+
- ./backend/websocket-server:/app
58+
networks:
59+
- internal

backend/websocket-server/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__pycache__

backend/websocket-server/Dockerfile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
FROM python:3.13-alpine
2+
3+
WORKDIR /app
4+
5+
RUN pip install poetry
6+
7+
COPY . /app
8+
RUN poetry install
9+
10+
CMD [ "poetry", "run", "python", "main.py" ]

backend/websocket-server/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Websocket Server
2+
3+
This app is a websocket server, which broadcasts messages it receives from the message queue to all connected clients.
4+
5+
It is a Python app with Poetry. You can run it like this:
6+
7+
```sh
8+
poetry install
9+
poetry run python main.py
10+
```
11+
12+
Or just run it via `docker-compose` from the main directory.

backend/websocket-server/app/__init__.py

Whitespace-only changes.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import asyncio
2+
import logging
3+
from typing import Callable
4+
5+
import redis.asyncio
6+
from redis.asyncio import from_url
7+
from tenacity import retry, retry_if_exception_type, wait_fixed
8+
9+
RETRY_INTERVALL = 2
10+
11+
12+
async def redis_reader(channel: redis.asyncio.client.PubSub, callback: Callable[[str], None]):
13+
while True:
14+
message = await channel.get_message(ignore_subscribe_messages=True)
15+
16+
if message and message["data"]:
17+
data = message["data"].decode("utf-8")
18+
logging.info(f"Received data from Redis: {data}")
19+
callback(data)
20+
21+
22+
@retry(wait=wait_fixed(RETRY_INTERVALL), retry=retry_if_exception_type(redis.exceptions.ConnectionError))
23+
async def start_redis_subscription(url: str, callback: Callable[[str], None]):
24+
client = from_url(url)
25+
async with client.pubsub() as pubsub:
26+
try:
27+
await pubsub.psubscribe("events.*")
28+
await asyncio.create_task(redis_reader(pubsub, callback))
29+
except redis.exceptions.ConnectionError as e:
30+
logging.error(f"Connection to Redis failed, trying again in {RETRY_INTERVALL} seconds.")
31+
raise e
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import asyncio
2+
import logging
3+
from typing import Set
4+
5+
import websockets.asyncio.connection
6+
from websockets.asyncio.server import ServerConnection, serve
7+
8+
connected_clients: Set[ServerConnection] = set()
9+
10+
11+
def broadcast(message: str):
12+
websockets.asyncio.connection.broadcast(connected_clients.copy(), message)
13+
14+
15+
async def start_websocket_server(host: str, port: int):
16+
async with serve(websocket_handler, host, port):
17+
await asyncio.get_running_loop().create_future()
18+
19+
20+
async def websocket_handler(websocket: ServerConnection):
21+
logging.info(f"Websocket client connected from {websocket.remote_address[0]}")
22+
connected_clients.add(websocket)
23+
24+
async for message in websocket:
25+
logging.info(f"Client sent a message: {str(message)}")
26+
27+
connected_clients.remove(websocket)

backend/websocket-server/main.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import asyncio
2+
import logging
3+
import os
4+
from signal import SIGINT, SIGTERM
5+
6+
from app.pubsub import start_redis_subscription
7+
from app.websockets import broadcast, start_websocket_server
8+
9+
logging.basicConfig(
10+
format="%(asctime)s %(message)s",
11+
level=logging.INFO,
12+
)
13+
14+
host = os.getenv("WEBSOCKET_SERVER_HOST", "localhost")
15+
port = int(os.getenv("WEBSOCKET_SERVER_PORT", 8765))
16+
redis_url = os.getenv("WEBSOCKET_SERVER_REDIS_URL", "redis://redis")
17+
18+
19+
def send_message_to_all(message: str):
20+
broadcast(message)
21+
22+
23+
async def main():
24+
async with asyncio.TaskGroup() as tg:
25+
tg.create_task(start_websocket_server(host, port))
26+
tg.create_task(start_redis_subscription(redis_url, send_message_to_all))
27+
28+
29+
if __name__ == "__main__":
30+
logging.info("🎸 Websocket server started.")
31+
32+
loop = asyncio.new_event_loop()
33+
asyncio.set_event_loop(loop)
34+
main_task = asyncio.ensure_future(main())
35+
for signal in [SIGINT, SIGTERM]:
36+
loop.add_signal_handler(signal, main_task.cancel)
37+
38+
try:
39+
loop.run_until_complete(main_task)
40+
except asyncio.exceptions.CancelledError:
41+
pass
42+
finally:
43+
loop.close()

0 commit comments

Comments
 (0)