From 06195e2e20d45729d95836cf05358ca2eecbfbde Mon Sep 17 00:00:00 2001 From: Vishwa Shah Date: Mon, 5 Aug 2024 05:24:31 -0700 Subject: [PATCH] feat: add dataframe <-> OSTk object utilities (#426) * feat: add dataframe <-> OSTk object utilities * fix: remove unused import --------- Co-authored-by: Kyle Cochran --- bindings/python/test/test_dataframe.py | 875 ++++++++++++++++++ bindings/python/test/test_display.py | 5 +- .../trajectory/state/test_numerical_solver.py | 4 +- .../python/ostk/astrodynamics/dataframe.py | 477 ++++++++++ .../python/ostk/astrodynamics/utilities.py | 2 +- 5 files changed, 1357 insertions(+), 6 deletions(-) create mode 100644 bindings/python/test/test_dataframe.py create mode 100644 bindings/python/tools/python/ostk/astrodynamics/dataframe.py diff --git a/bindings/python/test/test_dataframe.py b/bindings/python/test/test_dataframe.py new file mode 100644 index 000000000..8157c8db1 --- /dev/null +++ b/bindings/python/test/test_dataframe.py @@ -0,0 +1,875 @@ +# Copyright © Loft Orbital Solutions Inc. + +import pytest + +import numpy as np + +import pandas as pd + +from ostk.mathematics.geometry.d3.transformation.rotation import Quaternion + +from ostk.physics import Environment +from ostk.physics.time import Instant +from ostk.physics.time import DateTime +from ostk.physics.time import Duration +from ostk.physics.time import Scale +from ostk.physics.coordinate import Frame +from ostk.physics.coordinate import Position +from ostk.physics.coordinate import Velocity +from ostk.physics.coordinate.frame.provider.iau import Theory + +from ostk.astrodynamics.converters import coerce_to_datetime +from ostk.astrodynamics.trajectory import State +from ostk.astrodynamics.trajectory import LocalOrbitalFrameFactory +from ostk.astrodynamics.trajectory.state.coordinate_subset import CartesianPosition +from ostk.astrodynamics.trajectory.state.coordinate_subset import CartesianVelocity +from ostk.astrodynamics.flight import Profile + +from ostk.astrodynamics.dataframe import generate_states_from_dataframe +from ostk.astrodynamics.dataframe import generate_dataframe_from_states +from ostk.astrodynamics.dataframe import generate_profile_from_dataframe +from ostk.astrodynamics.dataframe import generate_dataframe_from_profile + + +class TestOrbitDataframe: + @pytest.fixture + def instant(self) -> Instant: + return Instant.date_time(DateTime.parse("2024-01-29T00:00:00"), Scale.UTC) + + @pytest.fixture + def frame(self) -> Frame: + return Frame.GCRF() + + @pytest.fixture + def position(self, frame: Frame) -> Position: + return Position.meters( + [755972.142139276024, -3390511.949699319433, 5955672.751532567665], + frame, + ) + + @pytest.fixture + def velocity(self, frame: Frame) -> Velocity: + return Velocity.meters_per_second( + [-563.764594800880, -6619.592151780337, -3685.668514834143], + frame, + ) + + @pytest.fixture + def attitude(self) -> Quaternion: + return Quaternion.xyzs( + -0.638160707740, -0.163520830523, 0.726693549038, 0.194751982966 + ) + + @pytest.fixture + def angular_velocity(self) -> np.ndarray: + return np.array([0.0, 0.0, 0.0]) + + @pytest.fixture + def orbit_state( + self, + instant: Instant, + position: Position, + velocity: Velocity, + ) -> State: + return State( + instant=instant, + position=position, + velocity=velocity, + ) + + @pytest.fixture + def orbit_states(self, orbit_state: State) -> list[State]: + return [orbit_state, orbit_state, orbit_state] + + @pytest.fixture + def profile_state( + self, + instant: Instant, + position: Position, + velocity: Velocity, + attitude: Quaternion, + angular_velocity: np.ndarray, + frame: Frame, + ) -> State: + return State(instant, position, velocity, attitude, angular_velocity, frame) + + @pytest.fixture + def profile_states(self, profile_state: State) -> list[State]: + return [profile_state, profile_state, profile_state] + + @pytest.fixture + def profile_dataframe_position_columns(self) -> list[str]: + return ["r_J2000 (IAU 2006)_x", "r_J2000 (IAU 2006)_y", "r_J2000 (IAU 2006)_z"] + + @pytest.fixture + def profile_dataframe_velocity_columns(self) -> list[str]: + return ["v_J2000 (IAU 2006)_x", "v_J2000 (IAU 2006)_y", "v_J2000 (IAU 2006)_z"] + + @pytest.fixture + def profile_dataframe_attitude_columns(self) -> list[str]: + return [ + "q_B_J2000 (IAU 2006)_x", + "q_B_J2000 (IAU 2006)_y", + "q_B_J2000 (IAU 2006)_z", + "q_B_J2000 (IAU 2006)_s", + ] + + @pytest.fixture + def profile_dataframe_angular_velocity_columns(self) -> list[str]: + return [ + "w_B_J2000 (IAU 2006)_in_B_x", + "w_B_J2000 (IAU 2006)_in_B_y", + "w_B_J2000 (IAU 2006)_in_B_z", + ] + + @pytest.fixture + def profile_dataframe( + self, + instant: Instant, + profile_dataframe_position_columns: list[str], + profile_dataframe_velocity_columns: list[str], + profile_dataframe_attitude_columns: list[str], + profile_dataframe_angular_velocity_columns: list[str], + ) -> pd.DataFrame: + return pd.DataFrame( + [ + { + "Timestamp": coerce_to_datetime(instant), + **dict(zip(profile_dataframe_position_columns, [1.0, 2.0, 3.0])), + **dict(zip(profile_dataframe_velocity_columns, [4.0, 5.0, 6.0])), + **dict(zip(profile_dataframe_attitude_columns, [0.0, 0.0, 0.0, 1.0])), + **dict( + zip(profile_dataframe_angular_velocity_columns, [0.0, 0.0, 0.0]) + ), + }, + { + "Timestamp": coerce_to_datetime(instant + Duration.minutes(1.0)), + **dict(zip(profile_dataframe_position_columns, [11.0, 12.0, 13.0])), + **dict(zip(profile_dataframe_velocity_columns, [14.0, 15.0, 16.0])), + **dict(zip(profile_dataframe_attitude_columns, [0.0, 0.0, 1.0, 0.0])), + **dict( + zip(profile_dataframe_angular_velocity_columns, [1.0, 1.0, 1.0]) + ), + }, + ] + ) + + @pytest.fixture + def profile_dataframe_indexed_timestamp( + self, + profile_dataframe: pd.DataFrame, + ) -> pd.DataFrame: + profile_dataframe.set_index("Timestamp", inplace=True) + return profile_dataframe + + @pytest.fixture + def orbit_dataframe(self, instant: Instant) -> pd.DataFrame: + return pd.DataFrame( + [ + { + "Timestamp": coerce_to_datetime(instant), + "r_GCRF_x": 1.0, + "r_GCRF_y": 2.0, + "r_GCRF_z": 3.0, + "v_GCRF_x": 4.0, + "v_GCRF_y": 5.0, + "v_GCRF_z": 6.0, + }, + { + "Timestamp": coerce_to_datetime(instant + Duration.minutes(1.0)), + "r_GCRF_x": 11.0, + "r_GCRF_y": 12.0, + "r_GCRF_z": 13.0, + "v_GCRF_x": 14.0, + "v_GCRF_y": 15.0, + "v_GCRF_z": 16.0, + }, + ] + ) + + @pytest.fixture + def orbit_dataframe_indexed_timestamp( + self, orbit_dataframe: pd.DataFrame + ) -> pd.DataFrame: + orbit_dataframe.set_index("Timestamp", inplace=True) + return orbit_dataframe + + def test_generate_orbit_states_from_dataframe_defaults_success( + self, + orbit_dataframe: pd.DataFrame, + ): + states: list[State] = generate_states_from_dataframe(orbit_dataframe) + + for state in states: + assert len(state.get_coordinates()) == len(orbit_dataframe.columns) - 1 + + def test_generate_profile_states_from_dataframe_defaults_success( + self, + profile_dataframe: pd.DataFrame, + ): + states: list[State] = generate_states_from_dataframe( + profile_dataframe, + reference_frame=Frame.J2000(Theory.IAU_2006), + ) + + for state in states: + assert len(state.get_coordinates()) == len(profile_dataframe.columns) - 1 + + def test_generate_states_from_profile_dataframe_success( + self, + profile_dataframe: pd.DataFrame, + profile_dataframe_position_columns: list[str], + profile_dataframe_velocity_columns: list[str], + profile_dataframe_attitude_columns: list[str], + profile_dataframe_angular_velocity_columns: list[str], + ): + states: list[State] = generate_states_from_dataframe( + dataframe=profile_dataframe, + reference_frame=Frame.J2000(Theory.IAU_2006), + time_column="Timestamp", + position_columns=profile_dataframe_position_columns, + velocity_columns=profile_dataframe_velocity_columns, + attitude_columns=profile_dataframe_attitude_columns, + angular_velocity_columns=profile_dataframe_angular_velocity_columns, + ) + + for state in states: + assert len(state.get_coordinates()) == len(profile_dataframe.columns) - 1 + + def test_generate_states_from_orbit_dataframe_success( + self, + orbit_dataframe: pd.DataFrame, + ): + states: list[State] = generate_states_from_dataframe( + dataframe=orbit_dataframe, + reference_frame=Frame.GCRF(), + time_column="Timestamp", + position_columns=["r_GCRF_x", "r_GCRF_y", "r_GCRF_z"], + velocity_columns=["v_GCRF_x", "v_GCRF_y", "v_GCRF_z"], + ) + + for state in states: + assert len(state.get_coordinates()) == len(orbit_dataframe.columns) - 1 + + def test_generate_states_from_profile_dataframe_success_defined_columns_without_time( + self, + profile_dataframe_indexed_timestamp: pd.DataFrame, + profile_dataframe_position_columns: list[str], + profile_dataframe_velocity_columns: list[str], + profile_dataframe_attitude_columns: list[str], + profile_dataframe_angular_velocity_columns: list[str], + ): + states: list[State] = generate_states_from_dataframe( + dataframe=profile_dataframe_indexed_timestamp, + reference_frame=Frame.J2000(Theory.IAU_2006), + position_columns=profile_dataframe_position_columns, + velocity_columns=profile_dataframe_velocity_columns, + attitude_columns=profile_dataframe_attitude_columns, + angular_velocity_columns=profile_dataframe_angular_velocity_columns, + ) + + for state in states: + assert len(state.get_coordinates()) == len( + profile_dataframe_indexed_timestamp.columns + ) + + def test_generate_states_from_orbit_dataframe_success_defined_columnsout_with_time( + self, + orbit_dataframe_indexed_timestamp: pd.DataFrame, + ): + states: list[State] = generate_states_from_dataframe( + dataframe=orbit_dataframe_indexed_timestamp, + reference_frame=Frame.GCRF(), + position_columns=["r_GCRF_x", "r_GCRF_y", "r_GCRF_z"], + velocity_columns=["v_GCRF_x", "v_GCRF_y", "v_GCRF_z"], + ) + + for state in states: + assert len(state.get_coordinates()) == len( + orbit_dataframe_indexed_timestamp.columns + ) + + def test_generate_dataframe_from_profile_states_success_custom_columns( + self, + profile_states: list[State], + ): + generated_dataframe: pd.DataFrame = generate_dataframe_from_states( + states=profile_states, + time_column="t", + position_columns=["r_1", "r_2", "r_3"], + velocity_columns=["v_1", "v_2", "v_3"], + attitude_columns=["q_1", "q_2", "q_3", "q_4"], + angular_velocity_columns=["w_1", "w_2", "w_3"], + ) + + assert list(generated_dataframe.columns) == [ + "r_1", + "r_2", + "r_3", + "v_1", + "v_2", + "v_3", + "q_1", + "q_2", + "q_3", + "q_4", + "w_1", + "w_2", + "w_3", + ] + + def test_generate_dataframe_from_orbit_states_success_custom_columns( + self, + orbit_states: list[State], + ): + generated_dataframe: pd.DataFrame = generate_dataframe_from_states( + states=orbit_states, + time_column="t", + position_columns=["r_1", "r_2", "r_3"], + velocity_columns=["v_1", "v_2", "v_3"], + ) + + assert list(generated_dataframe.columns) == [ + "r_1", + "r_2", + "r_3", + "v_1", + "v_2", + "v_3", + ] + + def test_generate_dataframe_from_profile_states_success_custom_reference_frame( + self, + profile_states: list[State], + ): + generated_dataframe: pd.DataFrame = generate_dataframe_from_states( + states=profile_states, + reference_frame=Frame.ITRF(), + ) + + assert list(generated_dataframe.columns) == [ + "r_ITRF_x", + "r_ITRF_y", + "r_ITRF_z", + "v_ITRF_x", + "v_ITRF_y", + "v_ITRF_z", + "q_B_ITRF_x", + "q_B_ITRF_y", + "q_B_ITRF_z", + "q_B_ITRF_s", + "w_B_ITRF_in_B_x", + "w_B_ITRF_in_B_y", + "w_B_ITRF_in_B_z", + ] + + def test_generate_dataframe_from_orbit_states_success_set_time_index_disabled( + self, + orbit_states: list[State], + ): + generated_dataframe: pd.DataFrame = generate_dataframe_from_states( + states=orbit_states, + time_column="t", + position_columns=["r_1", "r_2", "r_3"], + velocity_columns=["v_1", "v_2", "v_3"], + set_time_index=False, + ) + + assert list(generated_dataframe.columns) == [ + "t", + "r_1", + "r_2", + "r_3", + "v_1", + "v_2", + "v_3", + ] + + def test_generate_dataframe_from_profile_states_success_set_time_index_disabled( + self, + profile_states: list[State], + ): + generated_dataframe: pd.DataFrame = generate_dataframe_from_states( + states=profile_states, + time_column="t", + position_columns=["r_1", "r_2", "r_3"], + velocity_columns=["v_1", "v_2", "v_3"], + attitude_columns=["q_1", "q_2", "q_3", "q_4"], + angular_velocity_columns=["w_1", "w_2", "w_3"], + set_time_index=False, + ) + + assert list(generated_dataframe.columns) == [ + "t", + "r_1", + "r_2", + "r_3", + "v_1", + "v_2", + "v_3", + "q_1", + "q_2", + "q_3", + "q_4", + "w_1", + "w_2", + "w_3", + ] + + +class TestProfileDataframe: + @pytest.fixture + def environment(self) -> Environment: + return Environment.default() + + @pytest.fixture + def epoch(self) -> Instant: + return Instant.date_time(DateTime(2020, 1, 1, 0, 0, 0), Scale.UTC) + + @pytest.fixture + def dataframe(self, epoch: Instant) -> pd.DataFrame: + return pd.DataFrame( + [ + { + "Timestamp": coerce_to_datetime(epoch), + "r_GCRF_x": 1.0, + "r_GCRF_y": 2.0, + "r_GCRF_z": 3.0, + "v_GCRF_x": 4.0, + "v_GCRF_y": 5.0, + "v_GCRF_z": 6.0, + "q_B_GCRF_x": 0.0, + "q_B_GCRF_y": 0.0, + "q_B_GCRF_z": 0.0, + "q_B_GCRF_s": 1.0, + "w_B_GCRF_in_B_x": 0.0, + "w_B_GCRF_in_B_y": 0.0, + "w_B_GCRF_in_B_z": 0.0, + }, + { + "Timestamp": coerce_to_datetime(epoch + Duration.minutes(1.0)), + "r_GCRF_x": 11.0, + "r_GCRF_y": 12.0, + "r_GCRF_z": 13.0, + "v_GCRF_x": 14.0, + "v_GCRF_y": 15.0, + "v_GCRF_z": 16.0, + "q_B_GCRF_x": 0.0, + "q_B_GCRF_y": 0.0, + "q_B_GCRF_z": 1.0, + "q_B_GCRF_s": 0.0, + "w_B_GCRF_in_B_x": 1.0, + "w_B_GCRF_in_B_y": 1.0, + "w_B_GCRF_in_B_z": 1.0, + }, + ] + ) + + @pytest.fixture + def profile(self, dataframe: pd.DataFrame) -> Profile: + return generate_profile_from_dataframe( + dataframe=dataframe, + ) + + @pytest.fixture + def dataframe_indexed_timestamp(self, dataframe: pd.DataFrame) -> pd.DataFrame: + dataframe.set_index("Timestamp", inplace=True) + return dataframe + + def test_generate_profile_from_dataframe_success( + self, + epoch: Instant, + dataframe: pd.DataFrame, + ): + profile: Profile = generate_profile_from_dataframe( + dataframe=dataframe, + ) + + assert profile is not None + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_position().get_coordinates(), + np.array((1.0, 2.0, 3.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_velocity().get_coordinates(), + np.array((4.0, 5.0, 6.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_attitude().to_vector(Quaternion.Format.XYZS), + np.array((0.0, 0.0, 0.0, 1.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_angular_velocity(), + np.array((0.0, 0.0, 0.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_position() + .get_coordinates(), + np.array((11.0, 12.0, 13.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_velocity() + .get_coordinates(), + np.array((14.0, 15.0, 16.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_attitude() + .to_vector(Quaternion.Format.XYZS), + np.array((0.0, 0.0, 1.0, 0.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)).get_angular_velocity(), + np.array((1.0, 1.0, 1.0)), + atol=1e-8, + ) + + assert profile.get_state_at(epoch).get_frame() == Frame.GCRF() + + def test_generate_profile_from_dataframe_success_defined_columns_with_time( + self, + epoch: Instant, + dataframe: pd.DataFrame, + ): + profile: Profile = generate_profile_from_dataframe( + dataframe=dataframe, + time_column="Timestamp", + position_columns=["r_GCRF_x", "r_GCRF_y", "r_GCRF_z"], + velocity_columns=["v_GCRF_x", "v_GCRF_y", "v_GCRF_z"], + attitude_columns=["q_B_GCRF_x", "q_B_GCRF_y", "q_B_GCRF_z", "q_B_GCRF_s"], + angular_velocity_columns=[ + "w_B_GCRF_in_B_x", + "w_B_GCRF_in_B_y", + "w_B_GCRF_in_B_z", + ], + ) + + assert profile is not None + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_position().get_coordinates(), + np.array((1.0, 2.0, 3.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_velocity().get_coordinates(), + np.array((4.0, 5.0, 6.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_attitude().to_vector(Quaternion.Format.XYZS), + np.array((0.0, 0.0, 0.0, 1.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_angular_velocity(), + np.array((0.0, 0.0, 0.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_position() + .get_coordinates(), + np.array((11.0, 12.0, 13.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_velocity() + .get_coordinates(), + np.array((14.0, 15.0, 16.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_attitude() + .to_vector(Quaternion.Format.XYZS), + np.array((0.0, 0.0, 1.0, 0.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)).get_angular_velocity(), + np.array((1.0, 1.0, 1.0)), + atol=1e-8, + ) + + assert profile.get_state_at(epoch).get_frame() == Frame.GCRF() + + def test_generate_profile_from_dataframe_success_defined_columns_without_time( + self, + epoch: Instant, + dataframe_indexed_timestamp: pd.DataFrame, + ): + profile: Profile = generate_profile_from_dataframe( + dataframe=dataframe_indexed_timestamp, + position_columns=["r_GCRF_x", "r_GCRF_y", "r_GCRF_z"], + velocity_columns=["v_GCRF_x", "v_GCRF_y", "v_GCRF_z"], + attitude_columns=["q_B_GCRF_x", "q_B_GCRF_y", "q_B_GCRF_z", "q_B_GCRF_s"], + angular_velocity_columns=[ + "w_B_GCRF_in_B_x", + "w_B_GCRF_in_B_y", + "w_B_GCRF_in_B_z", + ], + ) + + assert profile is not None + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_position().get_coordinates(), + np.array((1.0, 2.0, 3.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_velocity().get_coordinates(), + np.array((4.0, 5.0, 6.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_attitude().to_vector(Quaternion.Format.XYZS), + np.array((0.0, 0.0, 0.0, 1.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_angular_velocity(), + np.array((0.0, 0.0, 0.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_position() + .get_coordinates(), + np.array((11.0, 12.0, 13.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_velocity() + .get_coordinates(), + np.array((14.0, 15.0, 16.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_attitude() + .to_vector(Quaternion.Format.XYZS), + np.array((0.0, 0.0, 1.0, 0.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)).get_angular_velocity(), + np.array((1.0, 1.0, 1.0)), + atol=1e-8, + ) + + assert profile.get_state_at(epoch).get_frame() == Frame.GCRF() + + def test_generate_profile_from_dataframe_success_no_angular_velocity_columns( + self, + epoch: Instant, + dataframe_indexed_timestamp: pd.DataFrame, + ): + dataframe_indexed_timestamp.drop( + ["w_B_GCRF_in_B_x", "w_B_GCRF_in_B_y", "w_B_GCRF_in_B_z"], + axis=1, + inplace=True, + ) + + profile: Profile = generate_profile_from_dataframe( + dataframe=dataframe_indexed_timestamp, + position_columns=["r_GCRF_x", "r_GCRF_y", "r_GCRF_z"], + velocity_columns=["v_GCRF_x", "v_GCRF_y", "v_GCRF_z"], + attitude_columns=["q_B_GCRF_x", "q_B_GCRF_y", "q_B_GCRF_z", "q_B_GCRF_s"], + ) + + assert profile is not None + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_position().get_coordinates(), + np.array((1.0, 2.0, 3.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_velocity().get_coordinates(), + np.array((4.0, 5.0, 6.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch).get_attitude().to_vector(Quaternion.Format.XYZS), + np.array((0.0, 0.0, 0.0, 1.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_position() + .get_coordinates(), + np.array((11.0, 12.0, 13.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_velocity() + .get_coordinates(), + np.array((14.0, 15.0, 16.0)), + atol=1e-8, + ) + + np.testing.assert_allclose( + profile.get_state_at(epoch + Duration.minutes(1.0)) + .get_attitude() + .to_vector(Quaternion.Format.XYZS), + np.array((0.0, 0.0, 1.0, 0.0)), + atol=1e-8, + ) + + assert profile.get_state_at(epoch).get_frame() == Frame.GCRF() + + def test_generate_dataframe_from_profile_success( + self, + epoch: Instant, + profile: Profile, + dataframe_indexed_timestamp: pd.DataFrame, + ): + generated_dataframe: pd.DataFrame = generate_dataframe_from_profile( + profile=profile, + instants=[ + epoch, + epoch + Duration.minutes(1.0), + ], + ) + + pd.testing.assert_frame_equal(generated_dataframe, dataframe_indexed_timestamp) + + def test_generate_dataframe_from_profile_success_custom_columns( + self, + epoch: Instant, + profile: Profile, + ): + generated_dataframe: pd.DataFrame = generate_dataframe_from_profile( + profile=profile, + instants=[ + epoch, + epoch + Duration.minutes(1.0), + ], + time_column="t", + position_columns=["r_1", "r_2", "r_3"], + velocity_columns=["v_1", "v_2", "v_3"], + attitude_columns=["q_1", "q_2", "q_3", "q_4"], + angular_velocity_columns=["w_1", "w_2", "w_3"], + ) + + assert list(generated_dataframe.columns) == [ + "r_1", + "r_2", + "r_3", + "v_1", + "v_2", + "v_3", + "q_1", + "q_2", + "q_3", + "q_4", + "w_1", + "w_2", + "w_3", + ] + + def test_generate_dataframe_from_profile_success_custom_reference_frame( + self, + epoch: Instant, + profile: Profile, + ): + generated_dataframe: pd.DataFrame = generate_dataframe_from_profile( + profile=profile, + instants=[ + epoch, + epoch + Duration.minutes(1.0), + ], + reference_frame=Frame.ITRF(), + ) + + assert list(generated_dataframe.columns) == [ + "r_ITRF_x", + "r_ITRF_y", + "r_ITRF_z", + "v_ITRF_x", + "v_ITRF_y", + "v_ITRF_z", + "q_B_ITRF_x", + "q_B_ITRF_y", + "q_B_ITRF_z", + "q_B_ITRF_s", + "w_B_ITRF_in_B_x", + "w_B_ITRF_in_B_y", + "w_B_ITRF_in_B_z", + ] + + def test_generate_dataframe_from_profile_success_set_time_index_disabled( + self, + epoch: Instant, + profile: Profile, + ): + generated_dataframe: pd.DataFrame = generate_dataframe_from_profile( + profile=profile, + instants=[ + epoch, + epoch + Duration.minutes(1.0), + ], + time_column="t", + position_columns=["r_1", "r_2", "r_3"], + velocity_columns=["v_1", "v_2", "v_3"], + attitude_columns=["q_1", "q_2", "q_3", "q_4"], + angular_velocity_columns=["w_1", "w_2", "w_3"], + set_time_index=False, + ) + + assert list(generated_dataframe.columns) == [ + "t", + "r_1", + "r_2", + "r_3", + "v_1", + "v_2", + "v_3", + "q_1", + "q_2", + "q_3", + "q_4", + "w_1", + "w_2", + "w_3", + ] diff --git a/bindings/python/test/test_display.py b/bindings/python/test/test_display.py index 9754214fd..5de0b8ab1 100644 --- a/bindings/python/test/test_display.py +++ b/bindings/python/test/test_display.py @@ -19,18 +19,17 @@ from ostk.astrodynamics import display from ostk.astrodynamics.access import Generator as AccessGenerator from ostk.astrodynamics.trajectory import Orbit -from ostk.astrodynamics.trajectory import State from ostk.astrodynamics.trajectory.orbit.model import SGP4 from ostk.astrodynamics.trajectory.orbit.model.sgp4 import TLE class TestDisplay: - def test_accesses_plot(self, state: State): + def test_accesses_plot(self): start_instant: Instant = Instant.date_time( DateTime(2023, 1, 3, 0, 0, 0), Scale.UTC, ) - duration: Duration = Duration.days(7.0) + duration: Duration = Duration.hours(12.0) step: Duration = Duration.seconds(10.0) tolerance: Duration = Duration.seconds(1.0) diff --git a/bindings/python/test/trajectory/state/test_numerical_solver.py b/bindings/python/test/trajectory/state/test_numerical_solver.py index 7852e5423..8e62741b5 100644 --- a/bindings/python/test/trajectory/state/test_numerical_solver.py +++ b/bindings/python/test/trajectory/state/test_numerical_solver.py @@ -191,7 +191,7 @@ def test_integrate_time( initial_state: State, numerical_solver: NumericalSolver, ): - duration_seconds: float = 100.0 + duration_seconds: float = 10.0 end_instant: Instant = initial_state.get_instant() + Duration.seconds( duration_seconds ) @@ -205,7 +205,7 @@ def test_integrate_time( end_instants: list[Instant] = [ initial_state.get_instant() + Duration.seconds(duration) - for duration in np.arange(600.0, 1000.0, 50.0) + for duration in np.arange(60.0, 100.0, 20.0) ] states: list[State] = numerical_solver.integrate_time( initial_state, end_instants, oscillator diff --git a/bindings/python/tools/python/ostk/astrodynamics/dataframe.py b/bindings/python/tools/python/ostk/astrodynamics/dataframe.py new file mode 100644 index 000000000..4cc8810cd --- /dev/null +++ b/bindings/python/tools/python/ostk/astrodynamics/dataframe.py @@ -0,0 +1,477 @@ +# Copyright © Loft Orbital Solutions Inc. + +import numpy as np + +import pandas as pd + +from ostk.mathematics.curve_fitting import Interpolator + +from ostk.physics.environment.object import Celestial +from ostk.physics.environment.object.celestial import Earth +from ostk.physics.time import Instant +from ostk.physics.coordinate import Frame + +from ostk.astrodynamics.converters import coerce_to_datetime +from ostk.astrodynamics.converters import coerce_to_instant +from ostk.astrodynamics.trajectory import State +from ostk.astrodynamics.trajectory import StateBuilder +from ostk.astrodynamics.trajectory import Orbit +from ostk.astrodynamics.trajectory.state import CoordinateSubset +from ostk.astrodynamics.trajectory.state.coordinate_subset import CartesianPosition +from ostk.astrodynamics.trajectory.state.coordinate_subset import CartesianVelocity +from ostk.astrodynamics.trajectory.state.coordinate_subset import AttitudeQuaternion +from ostk.astrodynamics.trajectory.state.coordinate_subset import AngularVelocity +from ostk.astrodynamics.trajectory.orbit.model import Tabulated as TabulatedOrbit +from ostk.astrodynamics.flight import Profile +from ostk.astrodynamics.flight.profile.model import Tabulated as TabulatedProfile + + +DEFAULT_REFERENCE_FRAME: Frame = Frame.GCRF() +DEFAULT_TIME_COLUMN: str = "Timestamp" +DEFAULT_POSITION_COLUMNS_FORMAT: list[str] = [ + "r_{frame}_x", + "r_{frame}_y", + "r_{frame}_z", +] +DEFAULT_VELOCITY_COLUMNS_FORMAT: list[str] = [ + "v_{frame}_x", + "v_{frame}_y", + "v_{frame}_z", +] +DEFAULT_ATTITUDE_COLUMNS_FORMAT: list[str] = [ + "q_B_{frame}_x", + "q_B_{frame}_y", + "q_B_{frame}_z", + "q_B_{frame}_s", +] +DEFAULT_ANGULAR_VELOCITY_COLUMNS_FORMAT: list[str] = [ + "w_B_{frame}_in_B_x", + "w_B_{frame}_in_B_y", + "w_B_{frame}_in_B_z", +] +CARTESIAN_POSITION_SUBSET: CartesianPosition = CartesianPosition.default() +CARTESIAN_VELOCITY_SUBSET: CartesianVelocity = CartesianVelocity.default() +ATTITUDE_QUATERNION_SUBSET: AttitudeQuaternion = AttitudeQuaternion.default() +ANGULAR_VELOCITY_SUBSET: AngularVelocity = AngularVelocity.default() + +DEFAULT_INTERPOLATION_TYPE: Interpolator.Type = Interpolator.Type.BarycentricRational + + +def generate_column_names( + reference_frame: Frame, + time_column: str | None = None, + position_columns: list[str] | None = None, + velocity_columns: list[str] | None = None, + attitude_columns: list[str] | None = None, + angular_velocity_columns: list[str] | None = None, +) -> tuple[str, list[str], list[str], list[str], list[str]]: + """ + Generate column names for a DataFrame containing orbit data. + + Args: + reference_frame (Frame): Reference frame. + time_column (str | None): Name of the column containing the time data in [UTC]. + position_columns (list[str] | None): List of column names containing the position data in [m]. + velocity_columns (list[str] | None): List of column names containing the velocity data in [m/s]. + attitude_columns (list[str] | None): List of column names containing the attitude data in [x, y, z, s] form. + angular_velocity_columns (list[str] | None): List of column names containing the angular velocity data in [rad/s]. + + Returns: + tuple[str, list[str], list[str], list[str], list[str]: Tuple containing the column names. + """ + reference_frame_name: str = reference_frame.get_name() + + return ( + time_column or DEFAULT_TIME_COLUMN, + position_columns + or [ + column.format(frame=reference_frame_name) + for column in DEFAULT_POSITION_COLUMNS_FORMAT + ], + velocity_columns + or [ + column.format(frame=reference_frame_name) + for column in DEFAULT_VELOCITY_COLUMNS_FORMAT + ], + attitude_columns + or [ + column.format(frame=reference_frame_name) + for column in DEFAULT_ATTITUDE_COLUMNS_FORMAT + ], + angular_velocity_columns + or [ + column.format(frame=reference_frame_name) + for column in DEFAULT_ANGULAR_VELOCITY_COLUMNS_FORMAT + ], + ) + + +def generate_states_from_dataframe( + dataframe: pd.DataFrame, + reference_frame: Frame | None = None, + time_column: str | None = None, + position_columns: list[str] | None = None, + velocity_columns: list[str] | None = None, + attitude_columns: list[str] | None = None, + angular_velocity_columns: list[str] | None = None, + output_frame: Frame | None = None, +) -> list[State]: + """ + Generate a list of OSTk States from a Pandas DataFrame. + + Args: + dataframe (pd.DataFrame): Pandas DataFrame containing the orbit data. + reference_frame (Frame | None, optional): Reference frame of the states in the dataframe. + time_column (str | None, optional): Name of the column containing the time data in [UTC]. + position_columns (list[str] | None, optional): List of column names containing the position data in [m]. + velocity_columns (list[str] | None, optional): List of column names containing the velocity data in [m/s]. + attitude_columns (list[str] | None, optional): List of column names containing the attitude data in [x, y, z, s] form. + angular_velocity_columns (list[str] | None, optional): List of column names containing the angular velocity data in [rad/s]. + output_frame (Frame | None, optional): Output frame for the states. + + Returns: + list[State]: List of OSTk States. + """ + + reference_frame = reference_frame or DEFAULT_REFERENCE_FRAME + + output_frame = output_frame or reference_frame + + ( + time_column, + position_columns, + velocity_columns, + attitude_columns, + angular_velocity_columns, + ) = generate_column_names( + reference_frame=reference_frame, + time_column=time_column, + position_columns=position_columns, + velocity_columns=velocity_columns, + attitude_columns=attitude_columns, + angular_velocity_columns=angular_velocity_columns, + ) + + if not set(position_columns).issubset(dataframe.columns): + raise ValueError("Position columns are not present in the DataFrame.") + + if not set(velocity_columns).issubset(dataframe.columns): + raise ValueError("Velocity columns are not present in the DataFrame.") + + # check if the dataframe contains a timestamp column + has_timestamp: bool = time_column in dataframe.columns + + # check if the dataframe contains attitude + has_attitude: bool = set(attitude_columns).issubset(dataframe.columns) + + # check if the dataframe contains angular velocity + has_angular_velocity: bool = set(angular_velocity_columns).issubset(dataframe.columns) + + coordinate_subsets = [ + CARTESIAN_POSITION_SUBSET, + CARTESIAN_VELOCITY_SUBSET, + ] + + if has_attitude: + coordinate_subsets.extend([ATTITUDE_QUATERNION_SUBSET]) + + if has_angular_velocity: + coordinate_subsets.extend([ANGULAR_VELOCITY_SUBSET]) + + state_builder: StateBuilder = StateBuilder( + frame=reference_frame, + coordinate_subsets=coordinate_subsets, + ) + + states: list[State] = [] + for index, row in dataframe.to_dict("index").items(): + + coordinates: list[float] = [ + *[row[column] for column in position_columns], + *[row[column] for column in velocity_columns], + ] + + if has_attitude: + coordinates.extend([row[column] for column in attitude_columns]) + + if has_angular_velocity: + coordinates.extend([row[column] for column in angular_velocity_columns]) + + states.append( + state_builder.build( + instant=coerce_to_instant( + row[time_column] if has_timestamp else index, + ), + coordinates=coordinates, + ).in_frame(output_frame) + ) + + return states + + +def generate_dataframe_from_states( + states: list[State], + reference_frame: Frame | None = None, + time_column: str | None = None, + position_columns: list[str] | None = None, + velocity_columns: list[str] | None = None, + attitude_columns: list[str] | None = None, + angular_velocity_columns: list[str] | None = None, + set_time_index: bool = True, +) -> pd.DataFrame: + """ + Generate a Pandas DataFrame from a list of OSTk States. + + Args: + states (list[State]): List of OSTk States. + reference_frame (Frame | None, optional): The desired reference frame. + time_column (str | None, optional): Name of the column containing the time data in [UTC]. + position_columns (list[str] | None, optional): List of column names containing the position data in [m]. + velocity_columns (list[str] | None, optional): List of column names containing the velocity data in [m/s]. + attitude_columns (list[str] | None, optional): List of column names containing the attitude data in [x, y, z, s] form. + angular_velocity_columns (list[str] | None, optional): List of column names containing the angular velocity data in [rad/s]. + set_time_index (bool, optional): Whether to set the time column as the index. Defaults to True. + + Returns: + pd.DataFrame: Pandas DataFrame containing the orbit data. + """ + has_attitude: bool = states[0].has_subset(ATTITUDE_QUATERNION_SUBSET) + has_angular_velocity: bool = states[0].has_subset(ANGULAR_VELOCITY_SUBSET) + + reference_frame = reference_frame or DEFAULT_REFERENCE_FRAME + + ( + time_column, + position_columns, + velocity_columns, + attitude_columns, + angular_velocity_columns, + ) = generate_column_names( + reference_frame=reference_frame, + time_column=time_column, + position_columns=position_columns, + velocity_columns=velocity_columns, + attitude_columns=attitude_columns, + angular_velocity_columns=angular_velocity_columns, + ) + + if len(position_columns) != 3: + raise ValueError("Position columns must be a list of length 3.") + + if len(velocity_columns) != 3: + raise ValueError("Velocity columns must be a list of length 3.") + + if len(attitude_columns) != 4: + raise ValueError("Attitude columns must be a list of length 4.") + + if len(angular_velocity_columns) != 3: + raise ValueError("Angular velocity columns must be a list of length 3.") + + data: list[dict] = [] + + for state in states: + state = state.in_frame(reference_frame) + + datum: dict = { + time_column: coerce_to_datetime(state.get_instant()), + **_get_entry_from_state(state, position_columns, CARTESIAN_POSITION_SUBSET), + **_get_entry_from_state(state, velocity_columns, CARTESIAN_VELOCITY_SUBSET), + } + + if has_attitude: + datum.update( + _get_entry_from_state(state, attitude_columns, ATTITUDE_QUATERNION_SUBSET) + ) + if has_angular_velocity: + datum.update( + _get_entry_from_state( + state, angular_velocity_columns, ANGULAR_VELOCITY_SUBSET + ) + ) + + data.append(datum) + + dataframe: pd.DataFrame = pd.DataFrame(data) + if set_time_index: + dataframe.set_index(time_column, inplace=True) + + return dataframe + + +def generate_orbit_from_dataframe( + dataframe: pd.DataFrame, + central_body: Celestial = Earth.default(), + reference_frame: Frame | None = None, + time_column: str | None = None, + position_columns: list[str] | None = None, + velocity_columns: list[str] | None = None, + initial_revolution_number: int = 1, + interpolation_type: Interpolator.Type | None = None, + output_frame: Frame | None = None, +) -> Orbit: + """ + Generate an OSTk Orbit from a Pandas DataFrame. + + Args: + dataframe (pd.DataFrame): Pandas DataFrame containing the orbit data. + central_body (Celestial, optional): Celestial object around which the Orbit is defined. Defaults to Earth. + reference_frame (Frame | None, optional): Reference frame. + time_column (str | None, optional): Name of the column containing the time data in [UTC]. + position_columns (list[str] | None, optional): List of column names containing the position data in [m]. + velocity_columns (list[str] | None, optional): List of column names containing the velocity data in [m/s]. + initial_revolution_number (int, optional): Initial revolution number. Defaults to 1. + interpolation_type (Interpolator.Type | None, optional): Interpolation type. + output_frame (Frame | None, optional): Output frame for the states. + + Returns: + Orbit: OSTk Orbit. + """ + + return Orbit( + model=TabulatedOrbit( + states=generate_states_from_dataframe( + dataframe=dataframe, + time_column=time_column, + position_columns=position_columns, + velocity_columns=velocity_columns, + reference_frame=reference_frame, + output_frame=output_frame, + ), + initial_revolution_number=initial_revolution_number, + interpolation_type=interpolation_type or DEFAULT_INTERPOLATION_TYPE, + ), + celestial_object=central_body, + ) + + +def generate_dataframe_from_orbit( + orbit: Orbit, + instants: list[Instant], + reference_frame: Frame | None = None, + time_column: str | None = None, + position_columns: list[str] | None = None, + velocity_columns: list[str] | None = None, + set_time_index: bool = True, +) -> pd.DataFrame: + """ + Generate a Pandas DataFrame from an OSTk Orbit. + + Args: + orbit (Orbit): OSTk Orbit. + instants (list[Instant]): List of instants at which the Orbit is to be evaluated. + reference_frame (Frame | None, optional): Reference frame. + time_column (str | None, optional): Name of the column containing the time data in [UTC]. + position_columns (list[str] | None, optional): List of column names containing the position data in [m]. + velocity_columns (list[str] | None, optional): List of column names containing the velocity data in [m/s]. + set_time_index (bool, optional): Whether to set the time column as the DataFrame index. Defaults to True. + + Returns: + pd.DataFrame: Pandas DataFrame containing the Orbit data. + """ + + states: list[State] = orbit.get_states_at(instants) + + return generate_dataframe_from_states( + states, + reference_frame=reference_frame, + time_column=time_column, + position_columns=position_columns, + velocity_columns=velocity_columns, + set_time_index=set_time_index, + ) + + +def generate_profile_from_dataframe( + dataframe: pd.DataFrame, + reference_frame: Frame | None = None, + time_column: str | None = None, + position_columns: list[str] | None = None, + velocity_columns: list[str] | None = None, + attitude_columns: list[str] | None = None, + angular_velocity_columns: list[str] | None = None, + output_frame: Frame | None = None, +) -> Profile: + """ + Generate an OSTk Profile from a Pandas DataFrame. + + Args: + dataframe (pd.DataFrame): Pandas DataFrame containing the Profile data. + reference_frame (Frame | None, optional): Reference frame. + time_column (str | None, optional): Name of the column containing the time data in [UTC]. + position_columns (list[str] | None, optional): List of column names containing the position data in [m]. + velocity_columns (list[str] | None, optional): List of column names containing the velocity data in [m/s]. + attitude_columns (list[str] | None, optional): List of column names containing the attitude data in [x, y, z, s] form. + angular_velocity_columns (list[str] | None, optional): List of column names containing the angular velocity data in [rad/s]. + output_frame (Frame | None, optional): Output frame for the states. + + Returns: + Profile: OSTk Profile. + """ + + return Profile( + model=TabulatedProfile( + states=generate_states_from_dataframe( + dataframe=dataframe, + reference_frame=reference_frame, + time_column=time_column, + position_columns=position_columns, + velocity_columns=velocity_columns, + attitude_columns=attitude_columns, + angular_velocity_columns=angular_velocity_columns, + output_frame=output_frame, + ), + ), + ) + + +def generate_dataframe_from_profile( + profile: Profile, + instants: list[Instant], + reference_frame: Frame | None = None, + time_column: str | None = None, + position_columns: list[str] | None = None, + velocity_columns: list[str] | None = None, + attitude_columns: list[str] | None = None, + angular_velocity_columns: list[str] | None = None, + set_time_index: bool = True, +) -> pd.DataFrame: + """ + Generate a Pandas DataFrame from an OSTk Profile. + + Args: + profile (Profile): OSTk Profile. + instants (list[Instant]): List of instants at which the Profile is to be evaluated. + reference_frame (Frame | None, optional): Reference frame. + time_column (str | None, optional): Name of the column containing the time data in [UTC]. + position_columns (list[str] | None, optional): List of column names containing the position data in [m]. + velocity_columns (list[str] | None, optional): List of column names containing the velocity data in [m/s]. + attitude_columns (list[str] | None, optional): List of column names containing the attitude data in [x, y, z, s] form. + angular_velocity_columns (list[str] | None, optional): List of column names containing the angular velocity data in [rad/s]. + set_time_index (bool, optional): Whether to set the time column as the DataFrame index. Defaults to True. + + Returns: + pd.DataFrame: Pandas DataFrame containing the Profile data. + """ + + states: list[State] = profile.get_states_at(instants) + + return generate_dataframe_from_states( + states, + reference_frame=reference_frame, + time_column=time_column, + position_columns=position_columns, + velocity_columns=velocity_columns, + attitude_columns=attitude_columns, + angular_velocity_columns=angular_velocity_columns, + set_time_index=set_time_index, + ) + + +def _get_entry_from_state( + state: State, columns: list[str], subset: CoordinateSubset +) -> dict: + coordinates: np.ndarray = state.extract_coordinate(subset) + return { + column.format(frame=state.get_frame().get_name()): coordinates[idx] + for idx, column in enumerate(columns) + } diff --git a/bindings/python/tools/python/ostk/astrodynamics/utilities.py b/bindings/python/tools/python/ostk/astrodynamics/utilities.py index e9ba9795f..55fe24546 100644 --- a/bindings/python/tools/python/ostk/astrodynamics/utilities.py +++ b/bindings/python/tools/python/ostk/astrodynamics/utilities.py @@ -134,7 +134,7 @@ def convert_state( state: trajectory.State, ) -> tuple[str, float, float, float, float, float, float, float, float, float]: """ - Convert an input (Instant, State) into dataframe-ready values. + Convert a State into dataframe-ready values. """ lla: LLA = LLA.cartesian(