Skip to content

Commit

Permalink
Merge pull request #110 from Idein/feature/localvideocast
Browse files Browse the repository at this point in the history
support local_video_server
  • Loading branch information
amutake authored Jan 21, 2025
2 parents c7de22b + f5931ae commit aa9c485
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Add `LocalVideoServer` which can cast mjpeg streaming over http.

## 2.9.0 (2024-12-02)

- Add support autofocus function for `imx708` sensor.
Expand Down
1 change: 1 addition & 0 deletions actfw_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from . import autofocus, capture, linux, system, task, unicam_isp_capture # noqa: F401
from .application import Application # noqa: F401
from .command_server import CommandServer # noqa: F401
from .local_video_server import LocalVideoServer # noqa: F401
from .service_client import ServiceClient # noqa: F401


Expand Down
141 changes: 141 additions & 0 deletions actfw_core/local_video_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import http.server
import io
import socketserver
import threading
from typing import Any, Generic, Optional, TypeVar

from PIL.Image import Image as PIL_Image

from .task import Isolated

PORT = 5100

T = TypeVar("T")


class _ObservableValue(Generic[T]):
def __init__(self) -> None:
self.value: Optional[T] = None
self.condition = threading.Condition()

def wait_new_value(self) -> T:
with self.condition:
self.condition.wait()
if self.value is None:
raise RuntimeError("No value has been set")
return self.value

def set(self, new_value: T) -> None:
with self.condition:
self.value = new_value
self.condition.notify_all()


class _LocalVideoStreamHandler(http.server.BaseHTTPRequestHandler):
def __init__(
self,
image: _ObservableValue[PIL_Image],
quality: int,
*args: Any,
) -> None:
self.image = image
self.quality = quality
super().__init__(*args)

def log_message(self, format: str, *args: Any) -> None:
"""Suppress log messages"""
pass

def do_GET(self) -> None:
self.send_response(200)
self.send_header("Age", str(0))
self.send_header("Cache-Control", "no-cache, private")
self.send_header("Pragma", "no-cache")
self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=FRAME")
self.end_headers()
try:
while True:
try:
frame = self.image.wait_new_value()
except Exception:
continue
else:
jpgimg = io.BytesIO()
frame.save(
jpgimg,
format="JPEG",
quality=self.quality,
)
self.wfile.write(b"--FRAME\r\n")
self.wfile.write(b"Content-Type: image/jpeg\r\n\r\n")
self.wfile.write(jpgimg.getvalue())
self.wfile.write(b"\r\n")
except Exception:
pass


class _LocalVideoStreamServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
allow_reuse_address = True
daemon_threads = True


class LocalVideoServer(Isolated):
image: _ObservableValue[PIL_Image]
server: _LocalVideoStreamServer

"""Local Video Server
This server provides a local video stream on port 5100.
"""

def __init__(
self,
quality: int = 75, # 75 is the default value of PIL JPEG quality
) -> None:
"""
Initialize a LocalVideoServer instance.
Args:
quality (int, optional):
The image quality setting. Acceptable values range from 0 to 100.
- 0: Lowest quality
- 95: Highest quality
- 100: JPEG lossless (no compression)
The `quality` parameter corresponds to the `quality` parameter in PIL's `Image.save` method.
Defaults to `75`.
"""
super().__init__()
self.image = _ObservableValue()

def handler(*args: Any) -> _LocalVideoStreamHandler:
return _LocalVideoStreamHandler(self.image, quality, *args)

self.server = _LocalVideoStreamServer(("", PORT), handler)

def update_image(self, image: PIL_Image) -> None:
"""
Update the video image.
Args:
image (:class:`~PIL.Image`): image
"""

try:
self.image.set(image.copy())
except Exception:
pass

def run(self) -> None:
self.server.serve_forever()

def stop(self) -> None:
super().stop()
self.server.shutdown()
self.server.server_close()
2 changes: 2 additions & 0 deletions tests/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"from_, import_",
[
("actfw_core", "Application"),
("actfw_core", "CommandServer"),
("actfw_core", "LocalVideoServer"),
("actfw_core.capture", "V4LCameraCapture"),
("actfw_core.task", "Consumer, Isolated, Join, Pipe, Producer, Task, Tee"),
],
Expand Down
33 changes: 33 additions & 0 deletions tests/unit_test/test_local_video_server_no_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/python3
import threading
import time
from typing import Any

import actfw_core


def test_local_video_server_terminate() -> None:
# try to catch assert error
exception = None

def excepthook(args: Any) -> None:
nonlocal exception
exception = args.exc_value

threading.excepthook = excepthook

# Actcast application
app = actfw_core.Application()

# LocalVideoServer (for mjpeg streaming over http)
local_video_server = actfw_core.LocalVideoServer(quality=75)
app.register_task(local_video_server)

# Start application
th = threading.Thread(target=lambda: app.run())
th.start()
# terminate LocalVideoServer task without calling `update_image`
time.sleep(0.1)
app.stop()
th.join()
assert exception is None

0 comments on commit aa9c485

Please sign in to comment.