From 83b0708e821ae3c9ceccc1c14a30c8b052a659e0 Mon Sep 17 00:00:00 2001 From: Rodja Trappe Date: Wed, 13 Dec 2023 10:47:07 +0100 Subject: [PATCH] cleanup --- rosys/analysis/timelapse_recorder.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/rosys/analysis/timelapse_recorder.py b/rosys/analysis/timelapse_recorder.py index a8b8fe736..8354c4528 100644 --- a/rosys/analysis/timelapse_recorder.py +++ b/rosys/analysis/timelapse_recorder.py @@ -44,9 +44,9 @@ def __init__(self, *, width: int = 800, height: int = 600, capture_rate=1) -> No self._notifications: list[list[str]] = [] self.camera: Optional[Camera] = None VIDEO_PATH.mkdir(parents=True, exist_ok=True) - rosys.on_repeat(self.capture, 0.01) + rosys.on_repeat(self._capture, 0.01) - async def capture(self) -> None: + async def _capture(self) -> None: if self.camera is None: return if rosys.time() - self.last_capture_time < 1 / self.capture_rate: @@ -57,6 +57,7 @@ async def capture(self) -> None: await self.save(images[-1]) async def save(self, image: RosysImage) -> None: + '''Captures an image to be used in video.''' await rosys.run.cpu_bound(_save_image, image, STORAGE_PATH, @@ -64,6 +65,7 @@ async def save(self, image: RosysImage) -> None: self._notifications.pop(0) if self._notifications else []) async def compress_video(self) -> None: + '''Creates a video from the captured images''' jpgs = sorted(STORAGE_PATH.glob('*.jpg')) if len(jpgs) < 20: self.log.info(f'very few images ({len(jpgs)}); not creating video') @@ -92,11 +94,13 @@ async def compress_video(self) -> None: rosys.background_tasks.create(rosys.run.sh(cmd, timeout=None, shell=True), name='timelapse ffmpeg') def discard_video(self) -> None: + '''Drop the currently recorded video data.''' for jpg in STORAGE_PATH.glob('*.jpg'): jpg.unlink() async def create_info(self, title: str, subtitle: str, frames: int = 20, time: Optional[float] = None) -> None: - await rosys.run.cpu_bound(save_info, title, subtitle, rosys.time() if time is None else time, frames) + '''Shows info on black screen with title and subtitle''' + await rosys.run.cpu_bound(_save_info, title, subtitle, rosys.time() if time is None else time, frames) def notify(self, message: str, frames: int = 10): '''Place message in the next n frames''' @@ -112,14 +116,14 @@ def _save_image(image: RosysImage, path: Path, size: tuple[int, int], notificati img = img.resize(size) draw = ImageDraw.Draw(img) x = y = 20 - write(f'{datetime.fromtimestamp(image.time).strftime(r"%Y-%m-%d %H:%M:%S")}, cam {image.camera_id}', draw, x, y) + _write(f'{datetime.fromtimestamp(image.time).strftime(r"%Y-%m-%d %H:%M:%S")}, cam {image.camera_id}', draw, x, y) for message in notifications: y += 30 - write(message, draw, x, y) + _write(message, draw, x, y) img.save(path / f'{int(image.time* 10000)}.jpg', 'JPEG') -def write(text: str, draw: ImageDraw.Draw, x: int, y: int): +def _write(text: str, draw: ImageDraw.Draw, x: int, y: int): # shadow draw.text((x - 1, y - 1), text, font=IMAGE_FONT, fill=(0, 0, 0)) draw.text((x + 1, y - 1), text, font=IMAGE_FONT, fill=(0, 0, 0)) @@ -129,7 +133,7 @@ def write(text: str, draw: ImageDraw.Draw, x: int, y: int): draw.text((x, y), text, font=IMAGE_FONT, fill=(255, 255, 255)) -def save_info(title: str, subtitle: str, time: float, frames: int) -> None: +def _save_info(title: str, subtitle: str, time: float, frames: int) -> None: for i in range(frames): img = Image.new('RGB', (1600, 1200), 'black') draw = ImageDraw.Draw(img)