Skip to content

Commit

Permalink
Add Profiling code
Browse files Browse the repository at this point in the history
  • Loading branch information
KavitShah1998 committed Oct 24, 2024
1 parent 8f5ac3e commit d05874f
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,50 @@
import cv2
import numpy as np
import rospy
import sophus as sp

try:
import sophuspy as sp
except Exception as e:
print(f"Cannot import sophuspy due to {e}. Import sophus instead")
import sophus as sp

# ROS imports
from geometry_msgs.msg import Pose, PoseStamped
from nav_msgs.msg import Odometry
from perception_and_utils.perception.detector_wrappers.april_tag_detector import (
AprilTagDetectorWrapper,
)
from perception_and_utils.perception.detector_wrappers.object_detector import (
ObjectDetectorWrapper,
)
from perception_and_utils.perception.detector_wrappers.human_motion_detector import (
HumanMotionDetector,
)

from perception_and_utils.perception.detector_wrappers.object_detector import (
ObjectDetectorWrapper,
)
from perception_and_utils.utils.data_frame import DataFrame
from spot_rl.utils.utils import ros_frames as rf

# ROS imports
from geometry_msgs.msg import Pose, PoseStamped
from nav_msgs.msg import Odometry
from std_msgs.msg import String
from tf2_ros import StaticTransformBroadcaster
from visualization_msgs.msg import Marker

from perception_and_utils.utils.data_frame import DataFrame

class CameraParams:
"""
CameraParams class to store camera parameters like focal length, principal point etc.
It stores the image, timestamp and frame number of each received image
"""
def __init__(self, focal_lengths: Tuple[float, float], principal_point: Tuple[float, float]):

def __init__(
self, focal_lengths: Tuple[float, float], principal_point: Tuple[float, float]
):
self._focal_lengths = focal_lengths # type: Optional[Tuple[float, float]]
self._principal_point = principal_point # type: Optional[Tuple[float, float]]


class HumanSensorDataStreamerInterface:
def __init__(self, verbose: bool = False) -> None:
self.verbose = verbose

self._is_connected = False # TODO : Maybe not needed
self._is_connected = False # TODO : Maybe not needed

# TODO: Update this later with PerceptionPipeline
# Create all detectors
Expand All @@ -57,19 +65,22 @@ def __init__(self, verbose: bool = False) -> None:
self.static_tf_broadcaster = StaticTransformBroadcaster()

self.human_activity_history_pub = rospy.Publisher(
"/human_activity_history", String, queue_size=10)
"/human_activity_history", String, queue_size=10
)

# TODO: Define DEVICE FRAME as a visual frame of reference i.e. front facing frame of reference
self.device_T_camera: Optional[sp.SE3] = None # TODO: Should be initialized by implementations of this class
self.device_T_camera: Optional[
sp.SE3
] = None # TODO: Should be initialized by implementations of this class

# Maintain a list of all poses where qr code is detected (w.r.t deviceWorld)
self.marker_positions_list = (
[]
) # type: List[np.ndarray] # List of position as np.ndarray (x, y, z)
self.marker_quaternion_list = (
[]
) # type: List[np.ndarray] # List of quaternions as np.ndarray (x, y, z, w)
self.avg_deviceWorld_T_marker = None # type: Optional[sp.SE3]
self.marker_positions_list: List[
np.ndarray
] = [] # List of position as np.ndarray (x, y, z)
self.marker_quaternion_list: List[
np.ndarray
] = [] # List of quaternions as np.ndarray (x, y, z, w)
self.avg_deviceWorld_T_marker: Optional[sp.SE3] = None

def connect(self):
"""
Expand Down Expand Up @@ -155,64 +166,65 @@ def initialize_object_detector(
raise NotImplementedError

def initialize_human_motion_detector(self, outputs: dict = {}):
"""
"""
""" """
raise NotImplementedError

##### ROS Publishers #####
def publish_human_pose(self, data_frame: DataFrame):
"""
Publishes current pose of Device as a pose of interest for Spot
Args:
data_frame (DataFrame): DataFrame object containing data packet with rgb, depth
and all necessary transformations
Publishes:
- /human_pose_publisher: ROS PoseStamped message for quest3World_T_rgb # TODO: Update this from rgb to VIZ frame
"""
# Publish as pose for detail
pose_stamped = PoseStamped()
pose_stamped.header.stamp = rospy.Time().now()
pose_stamped.header.seq = data_frame._frame_number
pose_stamped.header.frame_id = rf.QUEST3_CAMERA
pose_stamped.pose = data_frame._deviceWorld_T_camera_rgb # TODO: Use a VIZ frame which is forward facing
self.human_pose_publisher.publish(pose_stamped)

def publish_marker(self, data_frame: DataFrame, marker_scale: float=0.1):
"""
Publishes marker at pose of interest
Args:
data_frame (DataFrame): DataFrame object containing data packet with rgb, depth
and all necessary transformations
Publishes:
- /pose_of_interest_marker_publisher: ROS PoseStamped message for quest3World_T_rgb # TODO: Update this from rgb to VIZ frame
"""
# Publish as marker for interpretability
marker = Marker()
marker.header.stamp = rospy.Time().now()
marker.header.frame_id = rf.rf.QUEST3_CAMERA
marker.id = self._frame_number

marker.type = Marker.SPHERE
marker.action = Marker.ADD

marker.pose = data_frame._deviceWorld_T_camera_rgb

marker.scale.x = marker_scale
marker.scale.y = marker_scale
marker.scale.z = marker_scale

marker.color.a = 1.0
marker.color.r = 0.45
marker.color.g = 0.95
marker.color.b = 0.2

self.pose_of_interest_marker_publisher.publish(marker)

def publish_human_activity_history(self, human_history: List[Tuple[float,str]]):
# def publish_human_pose(self, data_frame: DataFrame):
# """
# Publishes current pose of Device as a pose of interest for Spot

# Args:
# data_frame (DataFrame): DataFrame object containing data packet with rgb, depth
# and all necessary transformations

# Publishes:
# - /human_pose_publisher: ROS PoseStamped message for quest3World_T_rgb # TODO: Update this from rgb to VIZ frame
# """
# # Publish as pose for detail
# pose_stamped = PoseStamped()
# pose_stamped.header.stamp = rospy.Time().now()
# pose_stamped.header.seq = data_frame._frame_number
# pose_stamped.header.frame_id = rf.QUEST3_CAMERA
# pose_stamped.pose = (
# data_frame._deviceWorld_T_camera_rgb
# ) # TODO: Use a VIZ frame which is forward facing
# self.human_pose_publisher.publish(pose_stamped)

# def publish_marker(self, data_frame: DataFrame, marker_scale: float = 0.1):
# """
# Publishes marker at pose of interest

# Args:
# data_frame (DataFrame): DataFrame object containing data packet with rgb, depth
# and all necessary transformations

# Publishes:
# - /pose_of_interest_marker_publisher: ROS PoseStamped message for quest3World_T_rgb # TODO: Update this from rgb to VIZ frame
# """
# # Publish as marker for interpretability
# marker = Marker()
# marker.header.stamp = rospy.Time().now()
# marker.header.frame_id = rf.rf.QUEST3_CAMERA
# marker.id = self._frame_number

# marker.type = Marker.SPHERE
# marker.action = Marker.ADD

# marker.pose = data_frame._deviceWorld_T_camera_rgb

# marker.scale.x = marker_scale
# marker.scale.y = marker_scale
# marker.scale.z = marker_scale

# marker.color.a = 1.0
# marker.color.r = 0.45
# marker.color.g = 0.95
# marker.color.b = 0.2

# self.pose_of_interest_marker_publisher.publish(marker)

def publish_human_activity_history(self, human_history: List[Tuple[float, str]]):
"""
Publishes human activity history as a string
Expand All @@ -227,7 +239,9 @@ def publish_human_activity_history(self, human_history: List[Tuple[float,str]]):
# Iterate over human history and create a string
human_history_str = ""
for timestamp, activity in human_history:
human_history_str += f"{timestamp},{activity}|" # str = "time1,Standing|time2,Walking"
human_history_str += (
f"{timestamp},{activity}|" # str = "time1,Standing|time2,Walking"
)

# Publish human activity history string
self.human_activity_history_pub.publish(human_history_str)
Loading

0 comments on commit d05874f

Please sign in to comment.