Skip to content

Commit

Permalink
Further cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
uellenberg committed Jan 17, 2025
1 parent 9cf64e8 commit 451bc55
Showing 1 changed file with 56 additions and 53 deletions.
109 changes: 56 additions & 53 deletions src/camera/camera/roscamera.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from pathlib import Path

import cv2 # OpenCV library
import rclpy # Python Client Library for ROS 2
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
Expand All @@ -8,30 +6,30 @@
from sensor_msgs.msg import CompressedImage # Image is the message type
from std_msgs.msg import String

"""
Function returns a list of working camera IDs to capture every camera connected to the robot
"""


def getCameras() -> list[int]:
nonWorkingPorts = 0
devPort = 0
workingPorts = []
"""
Function returns a list of working camera IDs to capture every camera connected to the robot
"""

while nonWorkingPorts < 6:
camera = cv2.VideoCapture(devPort)
non_working_ports = 0
dev_port = 0
working_ports = []

while non_working_ports < 6:
camera = cv2.VideoCapture(dev_port)
if not camera.isOpened():
nonWorkingPorts += 1
non_working_ports += 1
else:
isReading, img = camera.read()
is_reading, img = camera.read()
_ = camera.get(3)
_ = camera.get(4)
if isReading:
workingPorts.append(devPort)
if is_reading:
working_ports.append(dev_port)

devPort += 1
dev_port += 1

return workingPorts
return working_ports


class RosCamera(Node):
Expand All @@ -50,10 +48,11 @@ def __init__(self, topicName: str, camera: int):
# ex. topic message: "0 off", "0 on"
# The topic message should always have the corresponding camera number
# at the first index of each message.
subTopicName = "camera_control"
self.subscription = self.create_subscription(String, subTopicName, self.setCameraState, 10)
self.subscription
self.get_logger().info("Created Subscription to" + subTopicName)
sub_topic_name = "camera_control"
self.subscription = self.create_subscription(
String, sub_topic_name, self.setCameraState, 10
)
self.get_logger().info("Created Subscription to" + sub_topic_name)

# We will publish a message every 0.1 seconds
# timer_period = 0.1 # seconds
Expand All @@ -76,49 +75,49 @@ def __init__(self, topicName: str, camera: int):
self.declare_parameter("cameraNumber", camera)
self.declare_parameter("isOn", True)

"""
Callback function publishes a frame captured from a camera to /video_framesX (X is specific
camera ID) every 0.1 seconds
"""
def publishCameraFrame(self) -> None:
"""
Callback function publishes a frame captured from a camera to /video_framesX (X is specific
camera ID) every 0.1 seconds
"""

def publishCameraFrame(self):
# Capture frame-by-frame
# This method returns True/False as well as the video frame.
ret, frame = self.cap.read()

# Get bool parameter, isOn, and check the state of the camera to publish frame or not
# Value of isOn is controlled by setCameraState()
isOn = bool(self.get_parameter("isOn")._value)
if ret and isOn:
is_on = bool(self.get_parameter("isOn")._value)
if ret and is_on:
# Publish the image.
self._publisher.publish(self.br.cv2_to_compressed_imgmsg(frame))

"""
Function reads str msgs from topic, 'camera_control' (published by mission control), and
turns on/off the periodic publishing of frames by a certain camera
Function is called whenever a msg is published to 'camera_control'
Function assumes str msgs sent to 'camera_control' begin with a number and ends with an 'f'
(meaning off) or 'n' (meaning on)
ex.
"0 off" -> to turn off camera with id=0
"2 on" -> to turn on camera with id=2
"""
def setCameraState(self, msg: String) -> None:
"""
Function reads str msgs from topic, 'camera_control' (published by mission control), and
turns on/off the periodic publishing of frames by a certain camera
Function is called whenever a msg is published to 'camera_control'
Function assumes str msgs sent to 'camera_control' begin with a number and ends with an 'f'
(meaning off) or 'n' (meaning on)
ex.
"0 off" -> to turn off camera with id=0
"2 on" -> to turn on camera with id=2
"""

def setCameraState(self, msg) -> None:
# Check if this specific camera is targeted by mission control, stop the function otherwise
cameraNumber = int(self.get_parameter("cameraNumber")._value)
if int(msg.data[0]) != cameraNumber:
camera_number = int(self.get_parameter("cameraNumber")._value)
if int(msg.data[0]) != camera_number:
return

# Check if the desired camera state is already fulfilled, print and stop the function if so
cameraOn = bool(self.get_parameter("isOn")._value)
if str(msg.data[-1]) == "f" and not cameraOn:
self.get_logger().info("camera" + str(cameraNumber) + " is already off!")
camera_on = bool(self.get_parameter("isOn")._value)
if str(msg.data[-1]) == "f" and not camera_on:
self.get_logger().info("camera" + str(camera_number) + " is already off!")
return
elif str(msg.data[-1]) == "n" and cameraOn:
self.get_logger().info("camera" + str(cameraNumber) + " is already on!")
elif str(msg.data[-1]) == "n" and camera_on:
self.get_logger().info("camera" + str(camera_number) + " is already on!")
return

# Set isOn bool parameter to either stop or restart the publishing of frames
Expand All @@ -131,20 +130,24 @@ def setCameraState(self, msg) -> None:
self.declare_parameter("isOn", True)


def main(args=None):
def main(args: list[str] | None = None) -> None:
"""
The entry point of the node.
"""

rclpy.init(args=args)
try:
# We need an executor because running .spin() is a blocking function.
# using the MultiThreadedExecutor, we can control multiple nodes
executor = MultiThreadedExecutor()
nodes = []
cameraNum = 0
camera_num = 0

for cameraID in getCameras():
node = RosCamera("video_frames" + str(cameraNum), cameraID)
for camera_id in getCameras():
node = RosCamera("video_frames" + str(camera_num), camera_id)
nodes.append(node)
executor.add_node(node)
cameraNum += 1
camera_num += 1

try:
executor.spin()
Expand Down

0 comments on commit 451bc55

Please sign in to comment.