Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[uss_qualifier/scenarios/netrid] Move recent positions checks to display_data_evaluator from common_data_dictionary #863

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import datetime
import math
from typing import List, Optional

import s2sphere
from arrow import ParserError
from implicitdict import StringBasedDateTime
from uas_standards.ansi_cta_2063_a import SerialNumber
Expand All @@ -25,22 +23,15 @@
)

from monitoring.monitorlib.fetch.rid import (
FetchedFlights,
FlightDetails,
)
from monitoring.monitorlib.fetch.rid import Flight, Position
from monitoring.monitorlib.geo import validate_lat, validate_lng, Altitude, LatLngPoint
from monitoring.monitorlib.rid import RIDVersion
from monitoring.uss_qualifier.common_data_definitions import Severity
from monitoring.uss_qualifier.configurations.configuration import ParticipantID
from monitoring.uss_qualifier.resources.netrid.evaluation import EvaluationConfiguration
from monitoring.uss_qualifier.scenarios.scenario import TestScenarioType, PendingCheck

# SP responses to /flights endpoint's p99 should be below this:
SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC = 3
NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC = 60
_POSITION_TIMESTAMP_MAX_AGE_SEC = (
NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC + SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC
)
from monitoring.uss_qualifier.scenarios.scenario import TestScenarioType


class RIDCommonDictionaryEvaluator(object):
Expand All @@ -54,34 +45,17 @@ def __init__(
self._test_scenario = test_scenario
self._rid_version = rid_version

def evaluate_sp_flights(
def evaluate_sp_flight(
self,
requested_area: s2sphere.LatLngRect,
observed_flights: FetchedFlights,
participants: List[str],
observed_flight: Flight,
participant_id: ParticipantID,
):
"""Implements fragment documented in `common_dictionary_evaluator_sp_flight.md`."""

for url, uss_flights in observed_flights.uss_flight_queries.items():
# For the timing checks, we want to look at the flights relative to the query
# they came from, as they may be provided from different SP's.
for f in uss_flights.flights:
self.evaluate_sp_flight_recent_positions_times(
f,
uss_flights.query.response.reported.datetime,
participants,
)

self.evaluate_sp_flight_recent_positions_crossing_area_boundary(
requested_area, f, participants
)

for f in observed_flights.flights:
# Evaluate on all flights regardless of where they came from
self._evaluate_operational_status(
f.operational_status,
participants,
)
self._evaluate_operational_status(
observed_flight.operational_status,
[participant_id],
)

def evaluate_dp_flight(
self,
Expand Down Expand Up @@ -121,95 +95,6 @@ def evaluate_dp_flight(
participants,
)

def _evaluate_recent_position_time(
self, p: Position, query_time: datetime.datetime, check: PendingCheck
):
"""Check that the position's timestamp is at most 60 seconds before the request time."""
if (query_time - p.time).total_seconds() > _POSITION_TIMESTAMP_MAX_AGE_SEC:
check.record_failed(
"A Position timestamp was older than the tolerance.",
details=f"Position timestamp: {p.time}, query time: {query_time}",
severity=Severity.Medium,
)

def evaluate_sp_flight_recent_positions_times(
self, f: Flight, query_time: datetime.datetime, participants: List[str]
):
with self._test_scenario.check(
"Recent positions timestamps", participants
) as check:
for p in f.recent_positions:
self._evaluate_recent_position_time(p, query_time, check)

def _chronological_positions(self, f: Flight) -> List[s2sphere.LatLng]:
"""
Returns the recent positions of the flight, ordered by time with the oldest first, and the most recent last.
"""
return [
s2sphere.LatLng.from_degrees(p.lat, p.lng)
for p in sorted(f.recent_positions, key=lambda p: p.time)
]

def _sliding_triples(
self, points: List[s2sphere.LatLng]
) -> List[List[s2sphere.LatLng]]:
"""
Returns a list of triples of consecutive positions in passed the list.
"""
return [
(points[i], points[i + 1], points[i + 2]) for i in range(len(points) - 2)
]

def evaluate_sp_flight_recent_positions_crossing_area_boundary(
self, requested_area: s2sphere.LatLngRect, f: Flight, participants: List[str]
):
with self._test_scenario.check(
"Recent positions for aircraft crossing the requested area boundary show only one position before or after crossing",
participants,
) as check:

def fail_check():
check.record_failed(
"A position outside the area was neither preceded nor followed by a position inside the area.",
details=f"Positions: {f.recent_positions}, requested_area: {requested_area}",
severity=Severity.Medium,
)

positions = self._chronological_positions(f)
if len(positions) < 2:
# Check does not apply in this case
return

if len(positions) == 2:
# Only one of the positions can be outside the area. If both are, we fail.
if not requested_area.contains(
positions[0]
) and not requested_area.contains(positions[1]):
fail_check()
return

# For each sliding triple we check that if the middle position is outside the area, then either
# the first or the last position is inside the area. This means checking for any point that is inside the
# area in the triple and failing otherwise
for triple in self._sliding_triples(self._chronological_positions(f)):
if not (
requested_area.contains(triple[0])
or requested_area.contains(triple[1])
or requested_area.contains(triple[2])
):
fail_check()

# Finally we need to check for the forbidden corner cases of having the two first or two last positions being outside.
# (These won't be caught by the iteration on the triples above)
if (
not requested_area.contains(positions[0])
and not requested_area.contains(positions[1])
) or (
not requested_area.contains(positions[-1])
and not requested_area.contains(positions[-2])
):
fail_check()

def evaluate_sp_details(self, details: FlightDetails, participants: List[str]):
"""Implements fragment documented in `common_dictionary_evaluator_sp_flight_details.md`."""

Expand Down
Loading
Loading