diff --git a/aria_data_loaders/aria_data_utils/human_sensor_data_streamer_interface.py b/aria_data_loaders/aria_data_utils/human_sensor_data_streamer_interface.py index 06c565d9..63e30577 100644 --- a/aria_data_loaders/aria_data_utils/human_sensor_data_streamer_interface.py +++ b/aria_data_loaders/aria_data_utils/human_sensor_data_streamer_interface.py @@ -10,42 +10,50 @@ import cv2 import numpy as np import rospy -import sophus as sp + +try: + import sophuspy as sp +except Exception as e: + print(f"Cannot import sophuspy due to {e}. Import sophus instead") + import sophus as sp + +# ROS imports +from geometry_msgs.msg import Pose, PoseStamped +from nav_msgs.msg import Odometry from perception_and_utils.perception.detector_wrappers.april_tag_detector import ( AprilTagDetectorWrapper, ) -from perception_and_utils.perception.detector_wrappers.object_detector import ( - ObjectDetectorWrapper, -) from perception_and_utils.perception.detector_wrappers.human_motion_detector import ( HumanMotionDetector, ) - +from perception_and_utils.perception.detector_wrappers.object_detector import ( + ObjectDetectorWrapper, +) +from perception_and_utils.utils.data_frame import DataFrame from spot_rl.utils.utils import ros_frames as rf - -# ROS imports -from geometry_msgs.msg import Pose, PoseStamped -from nav_msgs.msg import Odometry from std_msgs.msg import String from tf2_ros import StaticTransformBroadcaster from visualization_msgs.msg import Marker -from perception_and_utils.utils.data_frame import DataFrame class CameraParams: """ CameraParams class to store camera parameters like focal length, principal point etc. It stores the image, timestamp and frame number of each received image """ - def __init__(self, focal_lengths: Tuple[float, float], principal_point: Tuple[float, float]): + + def __init__( + self, focal_lengths: Tuple[float, float], principal_point: Tuple[float, float] + ): self._focal_lengths = focal_lengths # type: Optional[Tuple[float, float]] self._principal_point = principal_point # type: Optional[Tuple[float, float]] + class HumanSensorDataStreamerInterface: def __init__(self, verbose: bool = False) -> None: self.verbose = verbose - self._is_connected = False # TODO : Maybe not needed + self._is_connected = False # TODO : Maybe not needed # TODO: Update this later with PerceptionPipeline # Create all detectors @@ -57,19 +65,22 @@ def __init__(self, verbose: bool = False) -> None: self.static_tf_broadcaster = StaticTransformBroadcaster() self.human_activity_history_pub = rospy.Publisher( - "/human_activity_history", String, queue_size=10) + "/human_activity_history", String, queue_size=10 + ) # TODO: Define DEVICE FRAME as a visual frame of reference i.e. front facing frame of reference - self.device_T_camera: Optional[sp.SE3] = None # TODO: Should be initialized by implementations of this class + self.device_T_camera: Optional[ + sp.SE3 + ] = None # TODO: Should be initialized by implementations of this class # Maintain a list of all poses where qr code is detected (w.r.t deviceWorld) - self.marker_positions_list = ( - [] - ) # type: List[np.ndarray] # List of position as np.ndarray (x, y, z) - self.marker_quaternion_list = ( - [] - ) # type: List[np.ndarray] # List of quaternions as np.ndarray (x, y, z, w) - self.avg_deviceWorld_T_marker = None # type: Optional[sp.SE3] + self.marker_positions_list: List[ + np.ndarray + ] = [] # List of position as np.ndarray (x, y, z) + self.marker_quaternion_list: List[ + np.ndarray + ] = [] # List of quaternions as np.ndarray (x, y, z, w) + self.avg_deviceWorld_T_marker: Optional[sp.SE3] = None def connect(self): """ @@ -155,64 +166,65 @@ def initialize_object_detector( raise NotImplementedError def initialize_human_motion_detector(self, outputs: dict = {}): - """ - """ + """ """ raise NotImplementedError ##### ROS Publishers ##### - def publish_human_pose(self, data_frame: DataFrame): - """ - Publishes current pose of Device as a pose of interest for Spot - - Args: - data_frame (DataFrame): DataFrame object containing data packet with rgb, depth - and all necessary transformations - - Publishes: - - /human_pose_publisher: ROS PoseStamped message for quest3World_T_rgb # TODO: Update this from rgb to VIZ frame - """ - # Publish as pose for detail - pose_stamped = PoseStamped() - pose_stamped.header.stamp = rospy.Time().now() - pose_stamped.header.seq = data_frame._frame_number - pose_stamped.header.frame_id = rf.QUEST3_CAMERA - pose_stamped.pose = data_frame._deviceWorld_T_camera_rgb # TODO: Use a VIZ frame which is forward facing - self.human_pose_publisher.publish(pose_stamped) - - def publish_marker(self, data_frame: DataFrame, marker_scale: float=0.1): - """ - Publishes marker at pose of interest - - Args: - data_frame (DataFrame): DataFrame object containing data packet with rgb, depth - and all necessary transformations - - Publishes: - - /pose_of_interest_marker_publisher: ROS PoseStamped message for quest3World_T_rgb # TODO: Update this from rgb to VIZ frame - """ - # Publish as marker for interpretability - marker = Marker() - marker.header.stamp = rospy.Time().now() - marker.header.frame_id = rf.rf.QUEST3_CAMERA - marker.id = self._frame_number - - marker.type = Marker.SPHERE - marker.action = Marker.ADD - - marker.pose = data_frame._deviceWorld_T_camera_rgb - - marker.scale.x = marker_scale - marker.scale.y = marker_scale - marker.scale.z = marker_scale - - marker.color.a = 1.0 - marker.color.r = 0.45 - marker.color.g = 0.95 - marker.color.b = 0.2 - - self.pose_of_interest_marker_publisher.publish(marker) - - def publish_human_activity_history(self, human_history: List[Tuple[float,str]]): + # def publish_human_pose(self, data_frame: DataFrame): + # """ + # Publishes current pose of Device as a pose of interest for Spot + + # Args: + # data_frame (DataFrame): DataFrame object containing data packet with rgb, depth + # and all necessary transformations + + # Publishes: + # - /human_pose_publisher: ROS PoseStamped message for quest3World_T_rgb # TODO: Update this from rgb to VIZ frame + # """ + # # Publish as pose for detail + # pose_stamped = PoseStamped() + # pose_stamped.header.stamp = rospy.Time().now() + # pose_stamped.header.seq = data_frame._frame_number + # pose_stamped.header.frame_id = rf.QUEST3_CAMERA + # pose_stamped.pose = ( + # data_frame._deviceWorld_T_camera_rgb + # ) # TODO: Use a VIZ frame which is forward facing + # self.human_pose_publisher.publish(pose_stamped) + + # def publish_marker(self, data_frame: DataFrame, marker_scale: float = 0.1): + # """ + # Publishes marker at pose of interest + + # Args: + # data_frame (DataFrame): DataFrame object containing data packet with rgb, depth + # and all necessary transformations + + # Publishes: + # - /pose_of_interest_marker_publisher: ROS PoseStamped message for quest3World_T_rgb # TODO: Update this from rgb to VIZ frame + # """ + # # Publish as marker for interpretability + # marker = Marker() + # marker.header.stamp = rospy.Time().now() + # marker.header.frame_id = rf.rf.QUEST3_CAMERA + # marker.id = self._frame_number + + # marker.type = Marker.SPHERE + # marker.action = Marker.ADD + + # marker.pose = data_frame._deviceWorld_T_camera_rgb + + # marker.scale.x = marker_scale + # marker.scale.y = marker_scale + # marker.scale.z = marker_scale + + # marker.color.a = 1.0 + # marker.color.r = 0.45 + # marker.color.g = 0.95 + # marker.color.b = 0.2 + + # self.pose_of_interest_marker_publisher.publish(marker) + + def publish_human_activity_history(self, human_history: List[Tuple[float, str]]): """ Publishes human activity history as a string @@ -227,7 +239,9 @@ def publish_human_activity_history(self, human_history: List[Tuple[float,str]]): # Iterate over human history and create a string human_history_str = "" for timestamp, activity in human_history: - human_history_str += f"{timestamp},{activity}|" # str = "time1,Standing|time2,Walking" + human_history_str += ( + f"{timestamp},{activity}|" # str = "time1,Standing|time2,Walking" + ) # Publish human activity history string self.human_activity_history_pub.publish(human_history_str) diff --git a/aria_data_loaders/aria_data_utils/quest3_data_streamer.py b/aria_data_loaders/aria_data_utils/quest3_data_streamer.py index 37b90722..ae7d1a0b 100644 --- a/aria_data_loaders/aria_data_utils/quest3_data_streamer.py +++ b/aria_data_loaders/aria_data_utils/quest3_data_streamer.py @@ -19,6 +19,7 @@ import logging import time +import traceback from typing import Any, Dict, List, Optional, Tuple import click @@ -26,7 +27,12 @@ import matplotlib.pyplot as plt import numpy as np import rospy -import sophus as sp + +try: + import sophuspy as sp +except Exception as e: + print(f"Cannot import sophuspy due to {e}. Import sophus instead") + import sophus as sp from perception_and_utils.utils.conversions import ( sophus_SE3_to_ros_PoseStamped, sophus_SE3_to_ros_TransformStamped, @@ -47,7 +53,10 @@ def __init__(self, verbose: bool = False) -> None: handler.setFormatter(formatter) self.logger.addHandler(handler) - self.frame_rate_counter = FrameRateCounter() + self.frc_all = FrameRateCounter() + self.frc_qrd = FrameRateCounter() + self.frc_hmd = FrameRateCounter() + self.frc_od = FrameRateCounter() self.unified_quest3_camera = UnifiedQuestCamera() self.rgb_cam_params: CameraParams = None @@ -60,14 +69,14 @@ def connect(self): Connect to Device """ self._is_connected = True - self.logger("Quest3DataStreamer :: Connected") + self.logger.info("Quest3DataStreamer :: Connected") def disconnect(self): """ Disconnects from Device cleanly """ self._is_connected = False - self.logger("Quest3DataStreamer :: Disconnected") + self.logger.info("Quest3DataStreamer :: Disconnected") def is_connected(self) -> bool: return ( @@ -167,10 +176,11 @@ def get_latest_data_frame(self) -> Optional[DataFrame]: data_frame._aligned_depth_frame = None # TODO: Add this logic # Update frame rate counter - self.frame_rate_counter.update() + # self.frame_rate_counter.update() data_frame._avg_rgb_fps = self.unified_quest3_camera.get_avg_fps_rgb() data_frame._avg_depth_fps = self.unified_quest3_camera.get_avg_fps_depth() - data_frame._avg_data_frame_fps = self.frame_rate_counter.avg_value() + # data_frame._avg_data_frame_fps = self.frame_rate_counter.avg_value() + data_frame._avg_data_frame_fps = 0.0 return data_frame @@ -183,15 +193,32 @@ def process_frame( detect_human_motion: bool = False, object_label="milk bottle", ) -> Tuple[np.ndarray, dict]: + rate_all = 0.0 + rate_qrd = 0.0 + rate_hmd = 0.0 + rate_od = 0.0 + + self.frc_all.start() + # Initialize object_scores to empty dict for current image frame # object_scores = {} # type: Dict[str, Any] # Get rgb image from frame viz_img = data_frame._rgb_frame if viz_img is None: - rospy.logwarn("No image found in frame") + rospy.logwarn("No image found in frame") # This gets over-written.. WASTE! + + if detect_objects: + self.frc_od.start() + viz_img, object_scores = self.object_detector.process_frame(data_frame) + # if object_label in object_scores.keys(): + # if self.verbose: + # plt.imsave(f"frame_{self.frame_number}.jpg", viz_img) + # self.publish_pose_of_interest(frame.get("ros_pose")) + rate_od = self.frc_od.stop() if detect_qr: + self.frc_qrd.start() (viz_img, camera_T_marker) = self.april_tag_detector.process_frame(frame=data_frame) # type: ignore deviceWorld_T_camera = data_frame._deviceWorld_T_camera_rgb @@ -213,9 +240,9 @@ def process_frame( self.marker_quaternion_list, self.avg_deviceWorld_T_marker, ) = get_running_avg_a_T_b( - current_avg_a_T_b=self.avg_deviceWorld_T_marker, - a_T_b_position_list=self.marker_positions_list, - a_T_b_quaternion_list=self.marker_quaternion_list, + current_avg_a_T_b=self.avg_deviceWorld_T_marker, # type: ignore + a_T_b_position_list=self.marker_positions_list, # type: ignore + a_T_b_quaternion_list=self.marker_quaternion_list, # type: ignore a_T_intermediate=deviceWorld_T_camera, intermediate_T_b=camera_T_marker, ) @@ -240,7 +267,10 @@ def process_frame( viz_img=viz_img, ) + rate_qrd = self.frc_qrd.stop() + if detect_human_motion: + self.frc_hmd.start() activity_str, avg_velocity = self.human_motion_detector.process_frame( frame=data_frame ) @@ -253,14 +283,24 @@ def process_frame( self.publish_human_activity_history( self.human_motion_detector.get_human_motion_history() ) + rate_hmd = self.frc_hmd.stop() if detect_objects: + self.frc_od.start() viz_img, object_scores = self.object_detector.process_frame(viz_img) if object_label in object_scores.keys(): if self.verbose: plt.imsave(f"frame_{self.frame_number}.jpg", viz_img) # self.publish_pose_of_interest(frame.get("ros_pose")) + rate_od = self.frc_od.stop() + + rate_all = self.frc_all.stop() + + outputs["process_all"] = rate_all + outputs["process_qr"] = rate_qrd + outputs["process_od"] = rate_od + outputs["process_hmd"] = rate_hmd return viz_img, outputs def initialize_april_tag_detector(self, outputs: dict = {}): @@ -345,11 +385,20 @@ def main(do_update_iptables: bool, debug: bool, hz: int): rospy.init_node("quest3_data_streamer", log_level=_log_level) rospy.loginfo("Starting quest3_data_streamer node") + # Logger for profiling + loggerp = logging.getLogger("Profiler-Quest3ProcessFrame") + loggerp.setLevel(logging.INFO) + csv_handlerp = logging.FileHandler("quest3_time_profiles_sec.txt") + formatterp = logging.Formatter("%(asctime)s, %(message)s") + csv_handlerp.setFormatter(formatterp) + loggerp.addHandler(csv_handlerp) + if do_update_iptables: update_iptables_quest3() else: rospy.logwarn("Not updating iptables") + outer_frc = FrameRateCounter() # rate = rospy.Rate(hz) outputs: Dict[str, Any] = {} data_streamer = None @@ -364,6 +413,7 @@ def main(do_update_iptables: bool, debug: bool, hz: int): data_streamer.initialize_human_motion_detector() # data_streamer.connect() while not rospy.is_shutdown(): + outer_frc.start() data_frame = data_streamer.get_latest_data_frame() if data_frame is not None: if debug: @@ -380,9 +430,15 @@ def main(do_update_iptables: bool, debug: bool, hz: int): cv2.waitKey(1) else: print("No data frame received.") - except Exception as e: + + outer_rate = outer_frc.stop() + msg_str = f"Fetch+AllDetection:{outer_rate},All_detectors={outputs.get('process_all',0.0)},QR={outputs.get('process_qr',0.0)},OD={outputs.get('process_od',0.0)},HMD={outputs.get('process_hmd',0.0)}" + loggerp.info(msg_str) + print(msg_str) + + except Exception: print("Ending script.") - rospy.logwarn(f"Exception: {e}") + rospy.logwarn(f"Exception: {traceback.format_exc()}") if data_streamer is not None: data_streamer.disconnect() diff --git a/aria_data_loaders/aria_data_utils/unified_quest_camera_interface.py b/aria_data_loaders/aria_data_utils/unified_quest_camera_interface.py index a6108b00..503f41a1 100644 --- a/aria_data_loaders/aria_data_utils/unified_quest_camera_interface.py +++ b/aria_data_loaders/aria_data_utils/unified_quest_camera_interface.py @@ -3,7 +3,12 @@ # LICENSE file in the root directory of this source tree. from typing import Optional, Tuple -import sophus as sp + +try: + import sophuspy as sp +except Exception as e: + print(f"Cannot import sophuspy due to {e}. Import sophus instead") + import sophus as sp import numpy as np """ @@ -51,9 +56,9 @@ def get_device_T_depthCamera(self) -> Optional[sp.SE3]: def get_avg_fps_rgb(self): raise NotImplementedError - + def get_avg_fps_depth(self): raise NotImplementedError - def is_connected(): + def is_connected(self): raise NotImplementedError diff --git a/perception_and_utils_root/perception_and_utils/perception/detector_wrappers/human_motion_detector.py b/perception_and_utils_root/perception_and_utils/perception/detector_wrappers/human_motion_detector.py index 3a83a9a9..6a46803c 100644 --- a/perception_and_utils_root/perception_and_utils/perception/detector_wrappers/human_motion_detector.py +++ b/perception_and_utils_root/perception_and_utils/perception/detector_wrappers/human_motion_detector.py @@ -5,19 +5,25 @@ import logging import os -from typing import Any, Dict, Tuple +from typing import Any, Dict, List, Tuple import numpy as np -import sophus as sp + +try: + import sophuspy as sp +except Exception as e: + print(f"Cannot import sophuspy due to {e}. Import sophus instead") + import sophus as sp + from perception_and_utils.perception.detector_wrappers.generic_detector_interface import ( GenericDetector, ) -from perception_and_utils.utils.image_utils import label_img, decorate_img_with_fps from perception_and_utils.utils.data_frame import DataFrame +from perception_and_utils.utils.image_utils import decorate_img_with_fps, label_img + class HumanMotionDetector(GenericDetector): - """ - """ + """ """ def __init__(self): super().__init__() @@ -26,8 +32,9 @@ def __init__(self): def _init_human_motion_detector( self, # velocity_threshold=1.2, - velocity_threshold=0.4, # hacking this - time_horizon_ns=1000000000): + velocity_threshold=0.4, # hacking this + time_horizon_ns=1000000000, + ): """ Initialize the human motion detector @@ -39,16 +46,15 @@ def _init_human_motion_detector( self.enable_detector() self._velocity_threshold = velocity_threshold - self._time_horizon_ns = time_horizon_ns # 1 second - self._previous_frames = [] # type: List[Tuple[np.ndarray, float]] - self._len_prev_frame_cache = 100 # Keep this value larger than fps - + self._time_horizon_ns = time_horizon_ns # 1 second + self._previous_frames = [] # type: List[Tuple[np.ndarray, float]] + self._len_prev_frame_cache = 100 # Keep this value larger than fps - self._human_motion_history = [] # type: List[Tuple[float, str]] + self._human_motion_history = [] # type: List[Tuple[float, str]] def process_frame( self, - frame: DataFrame, # new data frame + frame: DataFrame, # new data frame ) -> Tuple[str, float]: # Do nothing if detector is not enabled if self.is_enabled is False: @@ -64,8 +70,8 @@ def process_frame( self._previous_frames.append((current_position, frame._timestamp_s)) # If the cache is just 1 element or less, return "Standing" - lookback_index=10 - if len(self._previous_frames) < lookback_index+1: + lookback_index = 10 + if len(self._previous_frames) < lookback_index + 1: return "Standing", None # If the cache is full, remove the oldest frame @@ -73,9 +79,15 @@ def process_frame( self._previous_frames.pop(0) # Calculate the Euclidean distance between now & then - print(f"Current position: {current_position} | Previous position: {self._previous_frames[lookback_index][0]}") - distance = np.linalg.norm(current_position - self._previous_frames[lookback_index][0]) - time = frame._timestamp_s - self._previous_frames[lookback_index][1] # as we are considering x frames per second so our window is 1 sec + print( + f"Current position: {current_position} | Previous position: {self._previous_frames[lookback_index][0]}" + ) + distance = np.linalg.norm( + current_position - self._previous_frames[lookback_index][0] + ) + time = ( + frame._timestamp_s - self._previous_frames[lookback_index][1] + ) # as we are considering x frames per second so our window is 1 sec print(f"Distance: {distance} m | Time: {time} sec") avg_velocity = distance / time @@ -93,8 +105,7 @@ def get_outputs( img_frame: np.ndarray, outputs: Dict, ) -> Tuple[np.ndarray, Dict]: - """ - """ + """ """ viz_img = img_frame.copy() # Decorate image with text for visualization label_img( @@ -107,12 +118,15 @@ def get_outputs( text=f"Velocity: {outputs['velocity']}", org=(50, 250), ) - decorate_img_with_fps(viz_img, outputs["data_frame_fps"],pos=(50, 400)) + decorate_img_with_fps(viz_img, outputs["data_frame_fps"], pos=(50, 400)) return viz_img, outputs def update_human_motion_history(self, state, timestamp): # If current state of human is different from last state in history list, only then update history - if len(self._human_motion_history) == 0 or self._human_motion_history[-1][1] != state: + if ( + len(self._human_motion_history) == 0 + or self._human_motion_history[-1][1] != state + ): self._human_motion_history.append((timestamp, state)) def get_human_motion_history(self): diff --git a/perception_and_utils_root/perception_and_utils/perception/detector_wrappers/object_detector.py b/perception_and_utils_root/perception_and_utils/perception/detector_wrappers/object_detector.py index 27540a81..b5ebf0e0 100644 --- a/perception_and_utils_root/perception_and_utils/perception/detector_wrappers/object_detector.py +++ b/perception_and_utils_root/perception_and_utils/perception/detector_wrappers/object_detector.py @@ -10,11 +10,11 @@ from perception_and_utils.perception.detector_wrappers.generic_detector_interface import ( GenericDetector, ) +from perception_and_utils.utils.data_frame import DataFrame from perception_and_utils.utils.image_utils import ( centered_object_detection_heuristic, check_bbox_intersection, ) -from perception_and_utils.utils.data_frame import DataFrame from spot_rl.models.owlvit import OwlVit @@ -226,7 +226,9 @@ def process_frame(self, frame: DataFrame) -> Tuple[np.ndarray, Dict[str, Any]]: ) return updated_img_frame, object_scores - def process_frame_offline(self, frame: DataFrame) -> Tuple[np.ndarray, Dict[str, Any]]: + def process_frame_offline( + self, frame: DataFrame + ) -> Tuple[np.ndarray, Dict[str, Any]]: """ Process image frame to detect object instances and score them based on heuristics @@ -239,6 +241,8 @@ def process_frame_offline(self, frame: DataFrame) -> Tuple[np.ndarray, Dict[str, updated_img_frame (np.ndarray) : Image frame with detections and text for visualization object_scores (Dict[str, float]) : Dictionary of scores for each object in the image frame """ + img_frame = frame._rgb_frame + # Do nothing if detector is not enabled if self.is_enabled is False: self._logger.warning( diff --git a/perception_and_utils_root/perception_and_utils/utils/data_frame.py b/perception_and_utils_root/perception_and_utils/utils/data_frame.py index 116bd16a..54b06dea 100644 --- a/perception_and_utils_root/perception_and_utils/utils/data_frame.py +++ b/perception_and_utils_root/perception_and_utils/utils/data_frame.py @@ -1,21 +1,30 @@ +from typing import Optional + import numpy as np -import sophus as sp + +try: + import sophuspy as sp +except Exception as e: + print(f"Cannot import sophuspy due to {e}. Import sophus instead") + import sophus as sp + class DataFrame: """ DataFrame class to store instantaneous data from Human Sensor stream. It stores the image, timestamp and frame number of each received image """ + def __init__(self): self._frame_number = -1 self._timestamp_s = -1 - self._avg_rgb_fps = None # type: Optional[float] - self._avg_depth_fps = None # type: Optional[float] - self._avg_data_frame_fps = None # type: Optional[float] - self._rgb_frame = None # type: Optional[np.ndarray] - self._depth_frame = None # type: Optional[np.ndarray] - self._aligned_depth_frame = None # type: Optional[np.ndarray] - self._deviceWorld_T_camera_rgb = None # type: Optional[sp.SE3] - self._deviceWorld_T_camera_depth = None # type: Optional[sp.SE3] - self._device_T_camera_rgb = None # type: Optional[sp.SE3] - self._device_T_camera_depth = None # type: Optional[sp.SE3] + self._avg_rgb_fps = None # type: Optional[float] + self._avg_depth_fps = None # type: Optional[float] + self._avg_data_frame_fps = None # type: Optional[float] + self._rgb_frame = None # type: Optional[np.ndarray] + self._depth_frame = None # type: Optional[np.ndarray] + self._aligned_depth_frame = None # type: Optional[np.ndarray] + self._deviceWorld_T_camera_rgb = None # type: Optional[sp.SE3] + self._deviceWorld_T_camera_depth = None # type: Optional[sp.SE3] + self._device_T_camera_rgb = None # type: Optional[sp.SE3] + self._device_T_camera_depth = None # type: Optional[sp.SE3]