diff --git a/src/camera/camera/roscamera.py b/src/camera/camera/roscamera.py index f814d68..14dd635 100644 --- a/src/camera/camera/roscamera.py +++ b/src/camera/camera/roscamera.py @@ -1,5 +1,3 @@ -from pathlib import Path - import cv2 # OpenCV library import rclpy # Python Client Library for ROS 2 from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images @@ -8,30 +6,30 @@ from sensor_msgs.msg import CompressedImage # Image is the message type from std_msgs.msg import String -""" -Function returns a list of working camera IDs to capture every camera connected to the robot -""" - def getCameras() -> list[int]: - nonWorkingPorts = 0 - devPort = 0 - workingPorts = [] + """ + Function returns a list of working camera IDs to capture every camera connected to the robot + """ - while nonWorkingPorts < 6: - camera = cv2.VideoCapture(devPort) + non_working_ports = 0 + dev_port = 0 + working_ports = [] + + while non_working_ports < 6: + camera = cv2.VideoCapture(dev_port) if not camera.isOpened(): - nonWorkingPorts += 1 + non_working_ports += 1 else: - isReading, img = camera.read() + is_reading, img = camera.read() _ = camera.get(3) _ = camera.get(4) - if isReading: - workingPorts.append(devPort) + if is_reading: + working_ports.append(dev_port) - devPort += 1 + dev_port += 1 - return workingPorts + return working_ports class RosCamera(Node): @@ -50,10 +48,11 @@ def __init__(self, topicName: str, camera: int): # ex. topic message: "0 off", "0 on" # The topic message should always have the corresponding camera number # at the first index of each message. - subTopicName = "camera_control" - self.subscription = self.create_subscription(String, subTopicName, self.setCameraState, 10) - self.subscription - self.get_logger().info("Created Subscription to" + subTopicName) + sub_topic_name = "camera_control" + self.subscription = self.create_subscription( + String, sub_topic_name, self.setCameraState, 10 + ) + self.get_logger().info("Created Subscription to" + sub_topic_name) # We will publish a message every 0.1 seconds # timer_period = 0.1 # seconds @@ -76,49 +75,49 @@ def __init__(self, topicName: str, camera: int): self.declare_parameter("cameraNumber", camera) self.declare_parameter("isOn", True) - """ - Callback function publishes a frame captured from a camera to /video_framesX (X is specific - camera ID) every 0.1 seconds - """ + def publishCameraFrame(self) -> None: + """ + Callback function publishes a frame captured from a camera to /video_framesX (X is specific + camera ID) every 0.1 seconds + """ - def publishCameraFrame(self): # Capture frame-by-frame # This method returns True/False as well as the video frame. ret, frame = self.cap.read() # Get bool parameter, isOn, and check the state of the camera to publish frame or not # Value of isOn is controlled by setCameraState() - isOn = bool(self.get_parameter("isOn")._value) - if ret and isOn: + is_on = bool(self.get_parameter("isOn")._value) + if ret and is_on: # Publish the image. self._publisher.publish(self.br.cv2_to_compressed_imgmsg(frame)) - """ - Function reads str msgs from topic, 'camera_control' (published by mission control), and - turns on/off the periodic publishing of frames by a certain camera - - Function is called whenever a msg is published to 'camera_control' - - Function assumes str msgs sent to 'camera_control' begin with a number and ends with an 'f' - (meaning off) or 'n' (meaning on) - ex. - "0 off" -> to turn off camera with id=0 - "2 on" -> to turn on camera with id=2 - """ + def setCameraState(self, msg: String) -> None: + """ + Function reads str msgs from topic, 'camera_control' (published by mission control), and + turns on/off the periodic publishing of frames by a certain camera + + Function is called whenever a msg is published to 'camera_control' + + Function assumes str msgs sent to 'camera_control' begin with a number and ends with an 'f' + (meaning off) or 'n' (meaning on) + ex. + "0 off" -> to turn off camera with id=0 + "2 on" -> to turn on camera with id=2 + """ - def setCameraState(self, msg) -> None: # Check if this specific camera is targeted by mission control, stop the function otherwise - cameraNumber = int(self.get_parameter("cameraNumber")._value) - if int(msg.data[0]) != cameraNumber: + camera_number = int(self.get_parameter("cameraNumber")._value) + if int(msg.data[0]) != camera_number: return # Check if the desired camera state is already fulfilled, print and stop the function if so - cameraOn = bool(self.get_parameter("isOn")._value) - if str(msg.data[-1]) == "f" and not cameraOn: - self.get_logger().info("camera" + str(cameraNumber) + " is already off!") + camera_on = bool(self.get_parameter("isOn")._value) + if str(msg.data[-1]) == "f" and not camera_on: + self.get_logger().info("camera" + str(camera_number) + " is already off!") return - elif str(msg.data[-1]) == "n" and cameraOn: - self.get_logger().info("camera" + str(cameraNumber) + " is already on!") + elif str(msg.data[-1]) == "n" and camera_on: + self.get_logger().info("camera" + str(camera_number) + " is already on!") return # Set isOn bool parameter to either stop or restart the publishing of frames @@ -131,20 +130,24 @@ def setCameraState(self, msg) -> None: self.declare_parameter("isOn", True) -def main(args=None): +def main(args: list[str] | None = None) -> None: + """ + The entry point of the node. + """ + rclpy.init(args=args) try: # We need an executor because running .spin() is a blocking function. # using the MultiThreadedExecutor, we can control multiple nodes executor = MultiThreadedExecutor() nodes = [] - cameraNum = 0 + camera_num = 0 - for cameraID in getCameras(): - node = RosCamera("video_frames" + str(cameraNum), cameraID) + for camera_id in getCameras(): + node = RosCamera("video_frames" + str(camera_num), camera_id) nodes.append(node) executor.add_node(node) - cameraNum += 1 + camera_num += 1 try: executor.spin()