Skip to content

Commit

Permalink
Use args_from_dict in all camera classes (#257)
Browse files Browse the repository at this point in the history
* feat: remove overwrite of `from_dict`
and use `args_from_dict` in `RtspCamera`

* style: remove now unused import of Self

* add tests for storing and restoring cameras as dicts

* tests: move test of camera functionality into separate camera test

* tests: test base parameters for all types
tests: test calibratable subclasses

* fix: store camera base path

* refactor: move camera storage tests into separate file

* refactor: rename parameter fixture

* check base parameters in all tests

* fix: base path storage

* tests: fix persistence tests (including base path)

* style: remove import line wrap

---------

Co-authored-by: Paula Kammler <paula@zauberzeug.com>
  • Loading branch information
NiklasNeugebauer and codingpaula authored Feb 6, 2025
1 parent a4b7690 commit 48f86c4
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 12 deletions.
2 changes: 2 additions & 0 deletions rosys/vision/camera/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ def get_latest_image_url(self) -> str:
return self.get_image_url(image)

def to_dict(self) -> dict:
base_path_id = self.base_path.replace('images/', '', 1) if self.base_path.startswith('images') else None
return {
'id': self.id,
'name': self.name,
'connect_after_init': self.connect_after_init,
'image_history_length': self.images.maxlen,
'base_path_overwrite': base_path_id if base_path_id != self.id else None,
}

@classmethod
Expand Down
6 changes: 0 additions & 6 deletions rosys/vision/mjpeg_camera/mjpeg_camera.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import logging
from typing import Any

from typing_extensions import Self

from ... import rosys
from ..camera import ConfigurableCamera, TransformableCamera
from ..image import Image
Expand Down Expand Up @@ -57,10 +55,6 @@ def to_dict(self) -> dict:
'ip': self.ip,
}

@classmethod
def from_dict(cls, data: dict) -> Self:
return cls(**data)

@property
def is_connected(self) -> bool:
return (self.device is not None) and self.device.is_connected
Expand Down
2 changes: 1 addition & 1 deletion rosys/vision/rtsp_camera/rtsp_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def to_dict(self) -> dict[str, Any]:
def from_dict(cls, data: dict[str, Any]) -> Self:
if 'jovision_profile' in data:
data['substream'] = data['jovision_profile']
return cls(**data)
return cls(**cls.args_from_dict(data))

@property
def is_connected(self) -> bool:
Expand Down
5 changes: 0 additions & 5 deletions rosys/vision/usb_camera/usb_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Any

import numpy as np
from typing_extensions import Self

from ... import rosys
from ..camera.configurable_camera import ConfigurableCamera
Expand Down Expand Up @@ -46,10 +45,6 @@ def to_dict(self) -> dict[str, Any]:
name: param.value for name, param in self._parameters.items()
}

@classmethod
def from_dict(cls, data: dict[str, Any]) -> Self:
return cls(**data)

@property
def is_connected(self) -> bool:
return self.device is not None
Expand Down
147 changes: 147 additions & 0 deletions tests/test_camera_persistence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import pytest

from rosys.geometry import Rectangle
from rosys.vision import (
CalibratableCamera,
Calibration,
Camera,
MjpegCamera,
RtspCamera,
SimulatedCamera,
TransformableCamera,
UsbCamera,
)


@pytest.fixture
def base_camera_parameters() -> dict:
return {
'id': 'test_cam',
'name': 'T3:5T',
'connect_after_init': False,
'base_path_overwrite': 'new_base_path',
'image_history_length': 10
}


def assert_base_camera_parameters_match(camera1: Camera, camera2: Camera) -> None:
assert camera1.id == camera2.id
assert camera1.name == camera2.name
assert camera1.connect_after_init == camera2.connect_after_init
assert camera1.base_path == camera2.base_path
assert camera1.images.maxlen == camera2.images.maxlen


async def test_storing_camera_as_dict(rosys_integration, base_camera_parameters):
camera = Camera(**base_camera_parameters)
await camera.connect()
camera_as_dict = camera.to_dict()
restored_camera = Camera.from_dict(camera_as_dict)
assert isinstance(restored_camera, Camera)
assert_base_camera_parameters_match(camera, restored_camera)


async def test_storing_camera_with_no_basepath_overwrite(rosys_integration, base_camera_parameters):
camera = Camera(id='test_cam', base_path_overwrite=None)
camera_as_dict = camera.to_dict()
restored_camera = Camera.from_dict(camera_as_dict)
assert camera_as_dict['base_path_overwrite'] is None
assert isinstance(restored_camera, Camera)
assert restored_camera.base_path == camera.base_path


async def test_storing_simulated_camera_as_dict(rosys_integration, base_camera_parameters):
camera = SimulatedCamera(width=808, height=606, color='#010101', fps=1, **base_camera_parameters)
await camera.connect()
camera_as_dict = camera.to_dict()
restored_camera = SimulatedCamera.from_dict(camera_as_dict)
assert isinstance(restored_camera, SimulatedCamera)
assert_base_camera_parameters_match(camera, restored_camera)
assert restored_camera.resolution == camera.resolution
assert restored_camera.parameters == camera.parameters


def test_storing_transformable_camera_as_dict(rosys_integration, base_camera_parameters):
camera = TransformableCamera(crop=Rectangle(x=10, y=10, width=100, height=100),
rotation=90, **base_camera_parameters)
camera_as_dict = camera.to_dict()
restored_camera = TransformableCamera.from_dict(camera_as_dict)
assert isinstance(restored_camera, TransformableCamera)
assert_base_camera_parameters_match(camera, restored_camera)
assert restored_camera.crop == camera.crop
assert restored_camera.rotation == camera.rotation
assert restored_camera.rotation_angle == camera.rotation_angle


async def test_storing_mjpeg_camera_as_dict(rosys_integration, base_camera_parameters):
camera = MjpegCamera(ip='192.168.1.1', username='admin', password='admin', fps=1,
resolution=(808, 606), mirrored=True, **base_camera_parameters)
await camera.connect()
camera_as_dict = camera.to_dict()
restored_camera = MjpegCamera.from_dict(camera_as_dict)
assert isinstance(restored_camera, MjpegCamera)
assert_base_camera_parameters_match(camera, restored_camera)
assert restored_camera.parameters == camera.parameters
assert restored_camera.username == camera.username
assert restored_camera.password == camera.password
assert restored_camera.ip == camera.ip
assert restored_camera.mac == camera.mac


def test_storing_rtsp_camera_as_dict(rosys_integration, base_camera_parameters):
camera = RtspCamera(fps=1, substream=2, bitrate=4097, ip='192.168.1.1', **base_camera_parameters)
camera_as_dict = camera.to_dict()
restored_camera = RtspCamera.from_dict(camera_as_dict)
assert isinstance(restored_camera, RtspCamera)
assert_base_camera_parameters_match(camera, restored_camera)
assert restored_camera.parameters == camera.parameters
assert restored_camera.ip == camera.ip


def test_storing_usb_camera_as_dict(rosys_integration, base_camera_parameters):
camera = UsbCamera(auto_exposure=False, exposure=100, width=808, height=606, fps=1, **base_camera_parameters)
camera_as_dict = camera.to_dict()
restored_camera = UsbCamera.from_dict(camera_as_dict)
assert isinstance(restored_camera, UsbCamera)
assert_base_camera_parameters_match(camera, restored_camera)
assert restored_camera.parameters == camera.parameters


def test_storing_calibratable_camera_as_dict(rosys_integration, base_camera_parameters):
camera = CalibratableCamera.create_calibrated(width=808, height=606,
focal_length=577, x=1.0, y=2.0, z=3.0,
**base_camera_parameters)
assert isinstance(camera.calibration, Calibration)
camera_as_dict = camera.to_dict()
restored_camera = CalibratableCamera.from_dict(camera_as_dict)
assert isinstance(restored_camera, CalibratableCamera)
assert isinstance(restored_camera.calibration, Calibration)
assert_base_camera_parameters_match(camera, restored_camera)
assert restored_camera.calibration == camera.calibration


def test_storing_calibratable_camera_subclasses_as_dict(rosys_integration):
calibration = CalibratableCamera.create_calibrated(id='test_cam', width=808, height=606, focal_length=577,
x=1.0, y=2.0, z=3.0).calibration
assert calibration is not None

class CalibratableRtspCamera(CalibratableCamera, RtspCamera):
pass

class CalibratableUsbCamera(CalibratableCamera, UsbCamera):
pass

class CalibratableMjpegCamera(CalibratableCamera, MjpegCamera):
pass

class CalibratableSimulatedCamera(CalibratableCamera, SimulatedCamera):
pass

for camera_class in [CalibratableRtspCamera, CalibratableUsbCamera, CalibratableMjpegCamera, CalibratableSimulatedCamera]:
camera = camera_class(id='test_cam')
camera.calibration = calibration
camera_as_dict = camera.to_dict()
restored_camera = camera_class.from_dict(camera_as_dict)
assert isinstance(restored_camera, camera_class)
assert restored_camera.id == camera.id
assert restored_camera.calibration == camera.calibration

0 comments on commit 48f86c4

Please sign in to comment.