Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 113 additions & 9 deletions src/motion_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
from viam.services.vision import CaptureAllResult, Vision
from viam.utils import ValueTypes

LOGGER = getLogger("MotionDetectorLogger")


class MotionDetector(Vision, Reconfigurable):
"""
Expand All @@ -39,6 +37,12 @@ class MotionDetector(Vision, Reconfigurable):

def __init__(self, name: str):
super().__init__(name=name)
self.logger = getLogger(__name__)

# Cached grayscale frame from the previous call.
self._last_gray = None
# Default behavior: preserve legacy behavior unless configured otherwise
self.cache_image = False

# Constructor
@classmethod
Expand Down Expand Up @@ -160,6 +164,33 @@ def reconfigure(
else:
self.crop_region = None

# If cache_image is true, this service will perform temporal differencing
# against the previously-seen frame instead of grabbing two frames per call.
#
# If the attribute is missing or false, we fall back to the original behavior.
cache_image_flag = config.attributes.fields.get("cache_image")
self.cache_image = bool(cache_image_flag.bool_value) if cache_image_flag else False

# Reconfiguration invalidates the cached frame:
# - camera may have changed
# - crop region may have changed
# - sensitivity may have changed
self._last_gray = None
self.logger.debug(
"MotionDetector reconfigured",
extra={
"camera_name": self.cam_name,
"sensitivity": self.sensitivity,
"cache_image": self.cache_image,
"min_box_size": self.min_box_size,
"min_box_percent": self.min_box_percent,
"max_box_size": self.max_box_size,
"max_box_percent": self.max_box_percent,
"crop_region": self.crop_region,
},
)


# This will be the main method implemented in this module.
# Given a camera. Perform frame differencing and return how much of the image is moving
async def get_classifications(
Expand All @@ -171,11 +202,43 @@ async def get_classifications(
timeout: Optional[float] = None,
**kwargs,
) -> List[Classification]:
# If caching is enabled, only grab a single image and compare it
# to the previously-cached frame.
if self.cache_image:
self.logger.debug(
"cache_image=True → grabbing SINGLE frame (Classification)"
)
images, _ = await self.camera.get_images()
if not images:
raise ValueError("No images returned by get_images")
input_img = images[0]

img = pil.viam_to_pil_image(input_img)
img, _, _ = self.crop_image(img)
gray = cv2.cvtColor(np.array(img), cv2.COLOR_BGR2GRAY)

# If this is the first frame we have ever seen, there is no
# valid comparison to make yet.
if self._last_gray is None:
self.logger.debug(
"No previous frame available; skipping motion detection classification"
)
self._last_gray = gray
return []

# Perform motion detection by differencing the previous frame
# against the current frame.
result = self.classification_from_gray_imgs(self._last_gray, gray)
# Update the cache so the next call compares against this frame.
self._last_gray = gray
return result

# Grab and grayscale 2 images
images, _ = await self.camera.get_images()
if images is None or len(images) == 0:
self.logger.debug("cache_image=False → grabbing FIRST frame")
images1, _ = await self.camera.get_images()
if not images1:
raise ValueError("No images returned by get_images")
input1 = images[0]
input1 = images1[0]
if input1.mime_type not in [CameraMimeType.JPEG, CameraMimeType.PNG]:
raise ValueError(
"image mime type must be PNG or JPEG, not ", input1.mime_type
Expand All @@ -184,10 +247,11 @@ async def get_classifications(
img1, _, _ = self.crop_image(img1)
gray1 = cv2.cvtColor(np.array(img1), cv2.COLOR_BGR2GRAY)

camera_images, _ = await self.camera.get_images()
if camera_images is None or len(camera_images) == 0:
raise ValueError("No images were returned by get_images")
input2 = camera_images[0]
self.logger.debug("cache_image=False → grabbing SECOND frame")
images2, _ = await self.camera.get_images()
if not images2:
raise ValueError("No images returned by get_images")
input2 = images2[0]
if input2.mime_type not in [CameraMimeType.JPEG, CameraMimeType.PNG]:
raise ValueError(
"image mime type must be PNG or JPEG, not ", input2.mime_type
Expand Down Expand Up @@ -227,6 +291,46 @@ async def get_detections(
timeout: Optional[float] = None,
**kwargs,
) -> List[Detection]:
# If caching is enabled, only grab a single image and compare it
# to the previously-cached frame.
if self.cache_image:
self.logger.debug(
"cache_image=True → grabbing SINGLE frame (Detection)"
)
images, _ = await self.camera.get_images()
if not images:
raise ValueError("No images returned by get_images")
input_img = images[0]
if input_img.mime_type not in [CameraMimeType.JPEG, CameraMimeType.PNG]:
raise ValueError(
f"image mime type must be PNG or JPEG, not {input_img.mime_type}"
)


img = pil.viam_to_pil_image(input_img)
img, width, height = self.crop_image(img)
gray = cv2.cvtColor(np.array(img), cv2.COLOR_BGR2GRAY)

# If this is the first frame we have ever seen, there is no
# valid comparison to make yet.
if self._last_gray is None:
self.logger.debug(
"No previous frame available; skipping motion detection detections"
)
self._last_gray = gray
return []

# Perform detection using frame differencing against
# the cached previous frame.
detections = self.detections_from_gray_imgs(
self._last_gray, gray, width, height
)

# Update the cache so the next call compares against this frame.
self._last_gray = gray
return detections


# Grab and grayscale 2 images
images, _ = await self.camera.get_images()
if images is None or len(images) == 0:
Expand Down