Skip to content

Commit

Permalink
[mock_uss/scd] Handle retrieval of op intent details when USS is down…
Browse files Browse the repository at this point in the history
… according to SCD0005+SD0010 (#338)

* [mock_uss/scd] Handle retrieval of op intent details when USS is down according to SCD0005+SD0010

* set US locality highest_priority to 0
  • Loading branch information
mickmis authored Nov 14, 2023
1 parent 6409160 commit 3b16511
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 5 deletions.
77 changes: 72 additions & 5 deletions monitoring/mock_uss/f3548v21/flight_planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from monitoring.monitorlib.clients.flight_planning.flight_info import (
FlightInfo,
)
from monitoring.monitorlib.fetch import QueryError
from monitoring.uss_qualifier.resources.overrides import apply_overrides
from uas_standards.astm.f3548.v21 import api as f3548_v21
from uas_standards.astm.f3548.v21.constants import OiMaxVertices, OiMaxPlanHorizonDays
Expand Down Expand Up @@ -281,10 +282,12 @@ def op_intent_from_flightrecord(


def query_operational_intents(
locality: Locality,
area_of_interest: f3548_v21.Volume4D,
) -> List[f3548_v21.OperationalIntent]:
"""Retrieve a complete set of operational intents in an area, including details.
:param locality: Locality applicable to this query
:param area_of_interest: Area where intersecting operational intents must be discovered
:return: Full definition for every operational intent discovered
"""
Expand Down Expand Up @@ -314,10 +317,22 @@ def query_operational_intents(

updated_op_intents = []
for op_intent_ref in get_details_for:
op_intent, _ = scd_client.get_operational_intent_details(
utm_client, op_intent_ref.uss_base_url, op_intent_ref.id
)
updated_op_intents.append(op_intent)
try:
op_intent, _ = scd_client.get_operational_intent_details(
utm_client, op_intent_ref.uss_base_url, op_intent_ref.id
)
updated_op_intents.append(op_intent)
except QueryError as e:
if op_intent_ref.uss_availability == f3548_v21.UssAvailabilityState.Down:
# if the USS does not respond to request for details, and if it marked as down at the DSS, then we don't
# have to fail and can assume specific values for details
op_intent = get_down_uss_op_intent(
locality, area_of_interest, op_intent_ref
)
updated_op_intents.append(op_intent)
else:
# if the USS is not marked as down we just let the error bubble up
raise e
result.extend(updated_op_intents)

with db as tx:
Expand All @@ -327,6 +342,58 @@ def query_operational_intents(
return result


def get_down_uss_op_intent(
locality: Locality,
area_of_interest: f3548_v21.Volume4D,
op_intent_ref: f3548_v21.OperationalIntentReference,
) -> f3548_v21.OperationalIntent:
"""This function determines the operational intent to be considered in case its managing USS is determined to be
down and does not respond to the requests for details.
Note: This function will populate volumes (for accepted or activated states) and off_nominal_volumes (for contingent
and non-conforming states) with the area of interest that was requested. The reason is that later on the function
`check_for_disallowed_conflicts` will need to evaluate again those conflicts to determine pre-existing conflicts.
TODO: A better approach to this issue would be to store the area in conflict when a flight is planned with a
conflict, that way we can just retrieve the conflicting area instead of having to compute again the intersection
between the flight to be planned and the conflicting operational intent.
"""
# USS is declared as down and does not answer for details : minimum - 1
if op_intent_ref == f3548_v21.OperationalIntentState.Accepted:
return f3548_v21.OperationalIntent(
reference=op_intent_ref,
details=f3548_v21.OperationalIntentDetails(
volumes=[area_of_interest],
priority=locality.lowest_bound_priority(),
),
)

elif op_intent_ref == f3548_v21.OperationalIntentState.Activated:
return f3548_v21.OperationalIntent(
reference=op_intent_ref,
details=f3548_v21.OperationalIntentDetails(
volumes=[area_of_interest],
priority=locality.highest_priority(),
),
)

elif (
op_intent_ref == f3548_v21.OperationalIntentState.Contingent
or op_intent_ref == f3548_v21.OperationalIntentState.Nonconforming
):
return f3548_v21.OperationalIntent(
reference=op_intent_ref,
details=f3548_v21.OperationalIntentDetails(
off_nominal_volumes=[area_of_interest],
priority=locality.highest_priority(),
),
)

else:
raise ValueError(
f"operational intent {op_intent_ref.id}: invalid state {op_intent_ref.state}"
)


def check_op_intent(
new_flight: FlightRecord,
existing_flight: Optional[FlightRecord],
Expand Down Expand Up @@ -360,7 +427,7 @@ def check_op_intent(
+ new_flight.op_intent.details.off_nominal_volumes
)
vol4 = v1.bounding_volume.to_f3548v21()
op_intents = query_operational_intents(vol4)
op_intents = query_operational_intents(locality, vol4)

# Check for intersections
log(
Expand Down
22 changes: 22 additions & 0 deletions monitoring/monitorlib/locality.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ def allows_same_priority_intersections(self, priority: int) -> bool:
"""Returns true iff locality allows intersections between two operations at this priority level for ASTM F3548-21"""
raise NotImplementedError(Locality._NOT_IMPLEMENTED_MSG)

@abstractmethod
def lowest_bound_priority(self) -> int:
"""Returns the lowest bound priority status for ASTM F3548-21, which is a priority level lower than the lowest priority bound defined by the regulator of this locality"""
raise NotImplementedError(Locality._NOT_IMPLEMENTED_MSG)

@abstractmethod
def highest_priority(self) -> int:
"""Returns the highest priority level for ASTM F3548-21 defined by the regulator of this locality"""
raise NotImplementedError(Locality._NOT_IMPLEMENTED_MSG)

def __str__(self):
return self.__class__.__name__

Expand Down Expand Up @@ -56,6 +66,12 @@ def is_uspace_applicable(self) -> bool:
def allows_same_priority_intersections(self, priority: int) -> bool:
return False

def lowest_bound_priority(self) -> int:
return -1

def highest_priority(self) -> int:
return 100


class UnitedStatesIndustryCollaboration(Locality):
@classmethod
Expand All @@ -67,3 +83,9 @@ def is_uspace_applicable(self) -> bool:

def allows_same_priority_intersections(self, priority: int) -> bool:
return False

def lowest_bound_priority(self) -> int:
return -1

def highest_priority(self) -> int:
return 0 # as of the time of writing this, this value has not been subject to a firm decision

0 comments on commit 3b16511

Please sign in to comment.