1
- import datetime
2
1
import math
3
2
from typing import List , Optional
4
3
5
- import s2sphere
6
4
from arrow import ParserError
7
5
from implicitdict import StringBasedDateTime
8
6
from uas_standards .ansi_cta_2063_a import SerialNumber
25
23
)
26
24
27
25
from monitoring .monitorlib .fetch .rid import (
28
- FetchedFlights ,
29
26
FlightDetails ,
30
27
)
31
28
from monitoring .monitorlib .fetch .rid import Flight , Position
32
29
from monitoring .monitorlib .geo import validate_lat , validate_lng , Altitude , LatLngPoint
33
30
from monitoring .monitorlib .rid import RIDVersion
34
31
from monitoring .uss_qualifier .common_data_definitions import Severity
32
+ from monitoring .uss_qualifier .configurations .configuration import ParticipantID
35
33
from monitoring .uss_qualifier .resources .netrid .evaluation import EvaluationConfiguration
36
- from monitoring .uss_qualifier .scenarios .scenario import TestScenarioType , PendingCheck
37
-
38
- # SP responses to /flights endpoint's p99 should be below this:
39
- SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC = 3
40
- NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC = 60
41
- _POSITION_TIMESTAMP_MAX_AGE_SEC = (
42
- NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC + SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC
43
- )
34
+ from monitoring .uss_qualifier .scenarios .scenario import TestScenarioType
44
35
45
36
46
37
class RIDCommonDictionaryEvaluator (object ):
@@ -54,34 +45,17 @@ def __init__(
54
45
self ._test_scenario = test_scenario
55
46
self ._rid_version = rid_version
56
47
57
- def evaluate_sp_flights (
48
+ def evaluate_sp_flight (
58
49
self ,
59
- requested_area : s2sphere .LatLngRect ,
60
- observed_flights : FetchedFlights ,
61
- participants : List [str ],
50
+ observed_flight : Flight ,
51
+ participant_id : ParticipantID ,
62
52
):
63
53
"""Implements fragment documented in `common_dictionary_evaluator_sp_flight.md`."""
64
54
65
- for url , uss_flights in observed_flights .uss_flight_queries .items ():
66
- # For the timing checks, we want to look at the flights relative to the query
67
- # they came from, as they may be provided from different SP's.
68
- for f in uss_flights .flights :
69
- self .evaluate_sp_flight_recent_positions_times (
70
- f ,
71
- uss_flights .query .response .reported .datetime ,
72
- participants ,
73
- )
74
-
75
- self .evaluate_sp_flight_recent_positions_crossing_area_boundary (
76
- requested_area , f , participants
77
- )
78
-
79
- for f in observed_flights .flights :
80
- # Evaluate on all flights regardless of where they came from
81
- self ._evaluate_operational_status (
82
- f .operational_status ,
83
- participants ,
84
- )
55
+ self ._evaluate_operational_status (
56
+ observed_flight .operational_status ,
57
+ [participant_id ],
58
+ )
85
59
86
60
def evaluate_dp_flight (
87
61
self ,
@@ -121,95 +95,6 @@ def evaluate_dp_flight(
121
95
participants ,
122
96
)
123
97
124
- def _evaluate_recent_position_time (
125
- self , p : Position , query_time : datetime .datetime , check : PendingCheck
126
- ):
127
- """Check that the position's timestamp is at most 60 seconds before the request time."""
128
- if (query_time - p .time ).total_seconds () > _POSITION_TIMESTAMP_MAX_AGE_SEC :
129
- check .record_failed (
130
- "A Position timestamp was older than the tolerance." ,
131
- details = f"Position timestamp: { p .time } , query time: { query_time } " ,
132
- severity = Severity .Medium ,
133
- )
134
-
135
- def evaluate_sp_flight_recent_positions_times (
136
- self , f : Flight , query_time : datetime .datetime , participants : List [str ]
137
- ):
138
- with self ._test_scenario .check (
139
- "Recent positions timestamps" , participants
140
- ) as check :
141
- for p in f .recent_positions :
142
- self ._evaluate_recent_position_time (p , query_time , check )
143
-
144
- def _chronological_positions (self , f : Flight ) -> List [s2sphere .LatLng ]:
145
- """
146
- Returns the recent positions of the flight, ordered by time with the oldest first, and the most recent last.
147
- """
148
- return [
149
- s2sphere .LatLng .from_degrees (p .lat , p .lng )
150
- for p in sorted (f .recent_positions , key = lambda p : p .time )
151
- ]
152
-
153
- def _sliding_triples (
154
- self , points : List [s2sphere .LatLng ]
155
- ) -> List [List [s2sphere .LatLng ]]:
156
- """
157
- Returns a list of triples of consecutive positions in passed the list.
158
- """
159
- return [
160
- (points [i ], points [i + 1 ], points [i + 2 ]) for i in range (len (points ) - 2 )
161
- ]
162
-
163
- def evaluate_sp_flight_recent_positions_crossing_area_boundary (
164
- self , requested_area : s2sphere .LatLngRect , f : Flight , participants : List [str ]
165
- ):
166
- with self ._test_scenario .check (
167
- "Recent positions for aircraft crossing the requested area boundary show only one position before or after crossing" ,
168
- participants ,
169
- ) as check :
170
-
171
- def fail_check ():
172
- check .record_failed (
173
- "A position outside the area was neither preceded nor followed by a position inside the area." ,
174
- details = f"Positions: { f .recent_positions } , requested_area: { requested_area } " ,
175
- severity = Severity .Medium ,
176
- )
177
-
178
- positions = self ._chronological_positions (f )
179
- if len (positions ) < 2 :
180
- # Check does not apply in this case
181
- return
182
-
183
- if len (positions ) == 2 :
184
- # Only one of the positions can be outside the area. If both are, we fail.
185
- if not requested_area .contains (
186
- positions [0 ]
187
- ) and not requested_area .contains (positions [1 ]):
188
- fail_check ()
189
- return
190
-
191
- # For each sliding triple we check that if the middle position is outside the area, then either
192
- # the first or the last position is inside the area. This means checking for any point that is inside the
193
- # area in the triple and failing otherwise
194
- for triple in self ._sliding_triples (self ._chronological_positions (f )):
195
- if not (
196
- requested_area .contains (triple [0 ])
197
- or requested_area .contains (triple [1 ])
198
- or requested_area .contains (triple [2 ])
199
- ):
200
- fail_check ()
201
-
202
- # Finally we need to check for the forbidden corner cases of having the two first or two last positions being outside.
203
- # (These won't be caught by the iteration on the triples above)
204
- if (
205
- not requested_area .contains (positions [0 ])
206
- and not requested_area .contains (positions [1 ])
207
- ) or (
208
- not requested_area .contains (positions [- 1 ])
209
- and not requested_area .contains (positions [- 2 ])
210
- ):
211
- fail_check ()
212
-
213
98
def evaluate_sp_details (self , details : FlightDetails , participants : List [str ]):
214
99
"""Implements fragment documented in `common_dictionary_evaluator_sp_flight_details.md`."""
215
100
0 commit comments