diff --git a/src/gallia/cli/cursed_hr.py b/src/gallia/cli/cursed_hr.py index a5afab639..f729378ea 100644 --- a/src/gallia/cli/cursed_hr.py +++ b/src/gallia/cli/cursed_hr.py @@ -2,8 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import curses import curses.ascii import gzip diff --git a/src/gallia/log.py b/src/gallia/log.py index fdcbdf96f..9576fbfe0 100644 --- a/src/gallia/log.py +++ b/src/gallia/log.py @@ -2,8 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import atexit import dataclasses import datetime @@ -153,7 +151,7 @@ class PenlogPriority(IntEnum): TRACE = 8 @classmethod - def from_str(cls, string: str) -> PenlogPriority: + def from_str(cls, string: str) -> "PenlogPriority": """Converts a string to an instance of PenlogPriority. ``string`` can be a numeric value (0 to 8 inclusive) or a string with a case insensitive name of the level @@ -185,7 +183,7 @@ def from_str(cls, string: str) -> PenlogPriority: raise ValueError(f"{string} not a valid priority") @classmethod - def from_level(cls, value: int) -> PenlogPriority: + def from_level(cls, value: int) -> "PenlogPriority": """Converts an int value (e.g. from python's logging module) to an instance of this class. """ diff --git a/src/gallia/services/uds/core/client.py b/src/gallia/services/uds/core/client.py index 3e6ebbe61..070d6f2a9 100644 --- a/src/gallia/services/uds/core/client.py +++ b/src/gallia/services/uds/core/client.py @@ -2,8 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import asyncio import struct from collections.abc import Sequence diff --git a/src/gallia/services/uds/core/exception.py b/src/gallia/services/uds/core/exception.py index 60ff6a5d0..eb32cda81 100644 --- a/src/gallia/services/uds/core/exception.py +++ b/src/gallia/services/uds/core/exception.py @@ -2,8 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import asyncio from abc import ABC from typing import Any @@ -72,7 +70,7 @@ class UnexpectedResponse(ResponseException): class UnexpectedNegativeResponse(UnexpectedResponse, ABC): RESPONSE_CODE: UDSErrorCodes - _CONCRETE_EXCEPTIONS: dict[UDSErrorCodes | None, type[UnexpectedNegativeResponse]] = {} + _CONCRETE_EXCEPTIONS: dict[UDSErrorCodes | None, type["UnexpectedNegativeResponse"]] = {} def __init_subclass__(cls, /, response_code: UDSErrorCodes, **kwargs: Any) -> None: super().__init_subclass__(**kwargs) @@ -93,7 +91,7 @@ def __init__( @staticmethod def parse_dynamic( request: UDSRequest, response: NegativeResponse, message: str | None = None - ) -> UnexpectedNegativeResponse: + ) -> "UnexpectedNegativeResponse": return UnexpectedNegativeResponse._CONCRETE_EXCEPTIONS[response.response_code]( request, response, message ) diff --git a/src/gallia/services/uds/core/service.py b/src/gallia/services/uds/core/service.py index 23d8b121c..f5c7f5f47 100644 --- a/src/gallia/services/uds/core/service.py +++ b/src/gallia/services/uds/core/service.py @@ -2,14 +2,12 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import inspect import struct from abc import ABC, abstractmethod from collections.abc import Sequence from struct import pack -from typing import Any, TypeVar +from typing import Any, Self, TypeVar from gallia.log import get_logger from gallia.services.uds.core.constants import ( @@ -49,7 +47,7 @@ class UDSRequest(ABC): SERVICE_ID: int | None - RESPONSE_TYPE: type[PositiveResponse] + RESPONSE_TYPE: type["PositiveResponse"] _MINIMAL_LENGTH: int _MAXIMAL_LENGTH: int | None @@ -57,7 +55,7 @@ def __init_subclass__( cls, /, service_id: int | None, - response_type: type[PositiveResponse], + response_type: type["PositiveResponse"], minimal_length: int, maximal_length: int | None, **kwargs: Any, @@ -115,7 +113,7 @@ def __repr__(self) -> str: return f"{title}({attributes})" @staticmethod - def parse_dynamic(pdu: bytes) -> UDSRequest: + def parse_dynamic(pdu: bytes) -> "UDSRequest": try: logger.trace("Dynamically parsing request") logger.trace(f" - Got PDU {pdu.hex()}") @@ -209,7 +207,7 @@ def matches(self, request: UDSRequest) -> bool: pass @staticmethod - def parse_dynamic(pdu: bytes) -> UDSResponse: + def parse_dynamic(pdu: bytes) -> "UDSResponse": if pdu[0] == UDSIsoServices.NegativeResponse: return NegativeResponse.from_pdu(pdu) @@ -313,7 +311,7 @@ def pdu(self) -> bytes: return pack("!BBB", self.SERVICE_ID, self.request_service_id, self.response_code) @classmethod - def _from_pdu(cls, pdu: bytes) -> NegativeResponse: + def _from_pdu(cls, pdu: bytes) -> "NegativeResponse": return NegativeResponse(pdu[1], UDSErrorCodes(pdu[2])) @classmethod @@ -372,7 +370,7 @@ def parse_static( class UDSService(ABC): SERVICE_ID: UDSIsoServices | None - _SERVICES: dict[UDSIsoServices | None, type[UDSService]] = {} + _SERVICES: dict[UDSIsoServices | None, type["UDSService"]] = {} Response: type[PositiveResponse] | None = None Request: type[UDSRequest] | None = None @@ -578,8 +576,8 @@ def __init__(self, pdu: bytes) -> None: self._pdu = pdu @classmethod - def _from_pdu(cls, pdu: bytes) -> RawRequest: - return RawRequest(pdu) + def _from_pdu(cls, pdu: bytes) -> Self: + return cls(pdu) @property def service_id(self) -> int: @@ -621,10 +619,10 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu(cls, pdu: bytes) -> DiagnosticSessionControlResponse: + def _from_pdu(cls, pdu: bytes) -> Self: diagnostic_session_type = from_bytes(pdu[1:2]) session_parameter_record = pdu[2:] - return DiagnosticSessionControlResponse(diagnostic_session_type, session_parameter_record) + return cls(diagnostic_session_type, session_parameter_record) def __init__(self, diagnostic_session_type: int, session_parameter_record: bytes = b"") -> None: self.diagnostic_session_type = diagnostic_session_type @@ -668,8 +666,8 @@ def pdu(self) -> bytes: return pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit) @classmethod - def _from_pdu(cls, pdu: bytes) -> DiagnosticSessionControlRequest: - return DiagnosticSessionControlRequest(*sub_function_split(pdu[1])) + def _from_pdu(cls, pdu: bytes) -> Self: + return cls(*sub_function_split(pdu[1])) @property def sub_function(self) -> int: @@ -709,11 +707,11 @@ def pdu(self) -> bytes: return pack("!BBB", self.RESPONSE_SERVICE_ID, self.reset_type, self.power_down_time) @classmethod - def _from_pdu(cls, pdu: bytes) -> ECUResetResponse: + def _from_pdu(cls, pdu: bytes) -> Self: reset_type = pdu[1] power_down_time = pdu[2] if len(pdu) > 2 else None - return ECUResetResponse(reset_type, power_down_time) + return cls(reset_type, power_down_time) def matches(self, request: UDSRequest) -> bool: return isinstance(request, ECUResetRequest) and request.reset_type == self.reset_type @@ -747,8 +745,8 @@ def pdu(self) -> bytes: return pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit) @classmethod - def _from_pdu(cls, pdu: bytes) -> ECUResetRequest: - return ECUResetRequest(*sub_function_split(pdu[1])) + def _from_pdu(cls, pdu: bytes) -> Self: + return cls(*sub_function_split(pdu[1])) @property def sub_function(self) -> int: @@ -782,10 +780,10 @@ def pdu(self) -> bytes: return pack("!BB", self.RESPONSE_SERVICE_ID, self.security_access_type) + self.security_seed @classmethod - def _from_pdu(cls, pdu: bytes) -> SecurityAccessResponse: + def _from_pdu(cls, pdu: bytes) -> Self: security_access_type = from_bytes(pdu[1:2]) security_seed = pdu[2:] - return SecurityAccessResponse(security_access_type, security_seed) + return cls(security_access_type, security_seed) def matches(self, request: UDSRequest) -> bool: return ( @@ -856,12 +854,10 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu(cls, pdu: bytes) -> RequestSeedRequest: + def _from_pdu(cls, pdu: bytes) -> Self: security_access_type, suppress_response = sub_function_split(pdu[1]) security_access_data_record = pdu[2:] - return RequestSeedRequest( - security_access_type, security_access_data_record, suppress_response - ) + return cls(security_access_type, security_access_data_record, suppress_response) class SendKeyRequest( @@ -904,10 +900,10 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu(cls, pdu: bytes) -> SendKeyRequest: + def _from_pdu(cls, pdu: bytes) -> Self: security_access_type, suppress_response = sub_function_split(pdu[1]) security_key = pdu[2:] - return SendKeyRequest(security_access_type, security_key, suppress_response) + return cls(security_access_type, security_key, suppress_response) class SecurityAccess(SpecializedSubFunctionService, service_id=UDSIsoServices.SecurityAccess): @@ -946,9 +942,9 @@ def pdu(self) -> bytes: return pack("!BB", self.RESPONSE_SERVICE_ID, self.control_type) @classmethod - def _from_pdu(cls, pdu: bytes) -> CommunicationControlResponse: + def _from_pdu(cls, pdu: bytes) -> Self: control_type = pdu[1] - return CommunicationControlResponse(control_type) + return cls(control_type) def matches(self, request: UDSRequest) -> bool: return ( @@ -998,11 +994,11 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu(cls, pdu: bytes) -> CommunicationControlRequest: + def _from_pdu(cls, pdu: bytes) -> Self: control_type, suppress_response = sub_function_split(pdu[1]) communication_type = pdu[2] - return CommunicationControlRequest(control_type, communication_type, suppress_response) + return cls(control_type, communication_type, suppress_response) @property def sub_function(self) -> int: @@ -1031,8 +1027,8 @@ def pdu(self) -> bytes: return pack("!BB", self.RESPONSE_SERVICE_ID, self.SUB_FUNCTION_ID) @classmethod - def _from_pdu(cls, pdu: bytes) -> TesterPresentResponse: - return TesterPresentResponse() + def _from_pdu(cls, pdu: bytes) -> Self: + return cls() def matches(self, request: UDSRequest) -> bool: return isinstance(request, TesterPresentRequest) @@ -1061,8 +1057,8 @@ def pdu(self) -> bytes: return pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit) @classmethod - def _from_pdu(cls, pdu: bytes) -> TesterPresentRequest: - return TesterPresentRequest(cls.suppress_response_set(pdu)) + def _from_pdu(cls, pdu: bytes) -> Self: + return cls(cls.suppress_response_set(pdu)) class TesterPresent(UDSService, service_id=UDSIsoServices.TesterPresent): @@ -1106,9 +1102,9 @@ def pdu(self) -> bytes: return pack("!BB", self.RESPONSE_SERVICE_ID, self.dtc_setting_type) @classmethod - def _from_pdu(cls, pdu: bytes) -> ControlDTCSettingResponse: + def _from_pdu(cls, pdu: bytes) -> Self: dtc_setting_type = pdu[1] - return ControlDTCSettingResponse(dtc_setting_type) + return cls(dtc_setting_type) def matches(self, request: UDSRequest) -> bool: return ( @@ -1156,13 +1152,11 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu(cls, pdu: bytes) -> ControlDTCSettingRequest: + def _from_pdu(cls, pdu: bytes) -> Self: dtc_setting_type, suppress_response = sub_function_split(pdu[1]) dtc_setting_control_option_record = pdu[2:] - return ControlDTCSettingRequest( - dtc_setting_type, dtc_setting_control_option_record, suppress_response - ) + return cls(dtc_setting_type, dtc_setting_control_option_record, suppress_response) @property def sub_function(self) -> int: @@ -1248,7 +1242,7 @@ def pdu(self) -> bytes: return pdu @classmethod - def _from_pdu(cls, pdu: bytes) -> ReadDataByIdentifierResponse: + def _from_pdu(cls, pdu: bytes) -> Self: # Without knowing the lengths of the dataRecords in a response with multiple dataIdentifiers # and dataRecords it's not possible to recover all ids. # Therefore, only the first identifier is used and the rest is simply attributed to the @@ -1256,7 +1250,7 @@ def _from_pdu(cls, pdu: bytes) -> ReadDataByIdentifierResponse: data_identifier = from_bytes(pdu[1:3]) data_record = pdu[3:] - return ReadDataByIdentifierResponse(data_identifier, data_record) + return cls(data_identifier, data_record) def matches(self, request: UDSRequest) -> bool: if not isinstance(request, ReadDataByIdentifierRequest): @@ -1319,13 +1313,13 @@ def data_identifier(self, data_identifier: int) -> None: self.data_identifiers[0] = data_identifier @classmethod - def _from_pdu(cls, pdu: bytes) -> ReadDataByIdentifierRequest: + def _from_pdu(cls, pdu: bytes) -> Self: identifiers: list[int] = [] for i in range(1, len(pdu), 2): identifiers.append(from_bytes(pdu[i : i + 2])) - return ReadDataByIdentifierRequest(identifiers) + return cls(identifiers) @property def pdu(self) -> bytes: @@ -1357,10 +1351,10 @@ def pdu(self) -> bytes: return pack("!B", self.RESPONSE_SERVICE_ID) + self.data_record @classmethod - def _from_pdu(cls, pdu: bytes) -> ReadMemoryByAddressResponse: + def _from_pdu(cls, pdu: bytes) -> Self: data_record = pdu[1:] - return ReadMemoryByAddressResponse(data_record) + return cls(data_record) def __init__(self, data_record: bytes) -> None: super().__init__() @@ -1422,14 +1416,14 @@ def pdu(self) -> bytes: return pdu @classmethod - def _from_pdu(cls, pdu: bytes) -> ReadMemoryByAddressRequest: + def _from_pdu(cls, pdu: bytes) -> Self: address_and_length_format_identifier = pdu[1] address_length, size_length = address_and_size_length(address_and_length_format_identifier) if len(pdu) != 2 + address_length + size_length: raise ValueError("The addressAndLengthIdentifier is incompatible with the PDU size") - return ReadMemoryByAddressRequest( + return cls( from_bytes(pdu[2 : 2 + address_length]), from_bytes(pdu[2 + address_length : 2 + address_length + size_length]), address_and_length_format_identifier, @@ -1479,9 +1473,9 @@ def pdu(self) -> bytes: return pack("!BH", self.RESPONSE_SERVICE_ID, self.data_identifier) @classmethod - def _from_pdu(cls, pdu: bytes) -> WriteDataByIdentifierResponse: + def _from_pdu(cls, pdu: bytes) -> Self: data_identifier = from_bytes(pdu[1:3]) - return WriteDataByIdentifierResponse(data_identifier) + return cls(data_identifier) def matches(self, request: UDSRequest) -> bool: return ( @@ -1517,10 +1511,10 @@ def pdu(self) -> bytes: return pack("!BH", self.SERVICE_ID, self.data_identifier) + self.data_record @classmethod - def _from_pdu(cls, pdu: bytes) -> WriteDataByIdentifierRequest: + def _from_pdu(cls, pdu: bytes) -> Self: data_identifier = from_bytes(pdu[1:3]) data_record = pdu[3:] - return WriteDataByIdentifierRequest(data_identifier, data_record) + return cls(data_identifier, data_record) class WriteDataByIdentifier(UDSService, service_id=UDSIsoServices.WriteDataByIdentifier): @@ -1552,7 +1546,7 @@ def pdu(self) -> bytes: return pdu @classmethod - def _from_pdu(cls, pdu: bytes) -> WriteMemoryByAddressResponse: + def _from_pdu(cls, pdu: bytes) -> Self: address_and_length_format_identifier = pdu[1] addr_len, size_len = address_and_size_length(address_and_length_format_identifier) @@ -1564,9 +1558,7 @@ def _from_pdu(cls, pdu: bytes) -> WriteMemoryByAddressResponse: memory_address = from_bytes(pdu[2 : 2 + addr_len]) memory_size = from_bytes(pdu[2 + addr_len : 2 + addr_len + size_len]) - return WriteMemoryByAddressResponse( - memory_address, memory_size, address_and_length_format_identifier - ) + return cls(memory_address, memory_size, address_and_length_format_identifier) def __init__( self, @@ -1654,14 +1646,14 @@ def pdu(self) -> bytes: return pdu @classmethod - def _from_pdu(cls, pdu: bytes) -> WriteMemoryByAddressRequest: + def _from_pdu(cls, pdu: bytes) -> Self: address_and_length_format_identifier = pdu[1] address_length, size_length = address_and_size_length(address_and_length_format_identifier) if len(pdu) < 2 + address_length + size_length: raise ValueError("The addressAndLengthIdentifier is incompatible with the PDU size") - return WriteMemoryByAddressRequest( + return cls( from_bytes(pdu[2 : 2 + address_length]), pdu[2 + address_length + size_length :], from_bytes(pdu[2 + address_length : 2 + address_length + size_length]), @@ -1691,8 +1683,8 @@ def pdu(self) -> bytes: return bytes([self.RESPONSE_SERVICE_ID]) @classmethod - def _from_pdu(cls, pdu: bytes) -> ClearDiagnosticInformationResponse: - return ClearDiagnosticInformationResponse() + def _from_pdu(cls, pdu: bytes) -> Self: + return cls() def matches(self, request: UDSRequest) -> bool: return isinstance(request, ClearDiagnosticInformationRequest) @@ -1721,9 +1713,9 @@ def pdu(self) -> bytes: return bytes([self.SERVICE_ID]) + to_bytes(self.group_of_dtc, 3) @classmethod - def _from_pdu(cls, pdu: bytes) -> ClearDiagnosticInformationRequest: + def _from_pdu(cls, pdu: bytes) -> Self: group_of_dtc = from_bytes(pdu[1:]) - return ClearDiagnosticInformationRequest(group_of_dtc) + return cls(group_of_dtc) class ClearDiagnosticInformation(UDSService, service_id=UDSIsoServices.ClearDiagnosticInformation): @@ -2397,10 +2389,10 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu(cls, pdu: bytes) -> InputOutputControlByIdentifierResponse: + def _from_pdu(cls, pdu: bytes) -> Self: data_identifier = from_bytes(pdu[1:3]) control_status_record = pdu[3:] - return InputOutputControlByIdentifierResponse(data_identifier, control_status_record) + return cls(data_identifier, control_status_record) def matches(self, request: UDSRequest) -> bool: return ( @@ -2457,7 +2449,7 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu(cls, pdu: bytes) -> InputOutputControlByIdentifierRequest: + def _from_pdu(cls, pdu: bytes) -> Self: # Because both the controlOptionRecord as well as the controlEnableMaskRecord are of # variable size, and there is no field which describes those parameters, # it is impossible for the server to determine those fields reliably without vendor or ECU @@ -2467,9 +2459,7 @@ def _from_pdu(cls, pdu: bytes) -> InputOutputControlByIdentifierRequest: data_identifier = from_bytes(pdu[1:3]) control_option_record = pdu[3:] control_enable_mask_record = b"" - return InputOutputControlByIdentifierRequest( - data_identifier, control_option_record, control_enable_mask_record - ) + return cls(data_identifier, control_option_record, control_enable_mask_record) class ReturnControlToECUResponse( @@ -2514,10 +2504,10 @@ def __init__(self, data_identifier: int, control_enable_mask_record: bytes = b"" ) @classmethod - def _from_pdu(cls, pdu: bytes) -> ReturnControlToECURequest: + def _from_pdu(cls, pdu: bytes) -> Self: data_identifier = from_bytes(pdu[1:3]) control_enable_mask_record = pdu[4:] - return ReturnControlToECURequest(data_identifier, control_enable_mask_record) + return cls(data_identifier, control_enable_mask_record) class ResetToDefaultResponse( @@ -2562,10 +2552,10 @@ def __init__(self, data_identifier: int, control_enable_mask_record: bytes = b"" ) @classmethod - def _from_pdu(cls, pdu: bytes) -> ResetToDefaultRequest: + def _from_pdu(cls, pdu: bytes) -> Self: data_identifier = from_bytes(pdu[1:3]) control_enable_mask_record = pdu[4:] - return ResetToDefaultRequest(data_identifier, control_enable_mask_record) + return cls(data_identifier, control_enable_mask_record) class FreezeCurrentStateResponse( @@ -2609,10 +2599,10 @@ def __init__(self, data_identifier: int, control_enable_mask_record: bytes = b"" ) @classmethod - def _from_pdu(cls, pdu: bytes) -> FreezeCurrentStateRequest: + def _from_pdu(cls, pdu: bytes) -> Self: data_identifier = from_bytes(pdu[1:3]) control_enable_mask_record = pdu[4:] - return FreezeCurrentStateRequest(data_identifier, control_enable_mask_record) + return cls(data_identifier, control_enable_mask_record) class ShortTermAdjustmentResponse( @@ -3166,10 +3156,10 @@ def __init__( self.transfer_response_parameter_record = transfer_response_parameter_record @classmethod - def _from_pdu(cls, pdu: bytes) -> TransferDataResponse: + def _from_pdu(cls, pdu: bytes) -> Self: block_sequence_counter = pdu[1] transfer_response_parameter_record = pdu[2:] - return TransferDataResponse(block_sequence_counter, transfer_response_parameter_record) + return cls(block_sequence_counter, transfer_response_parameter_record) @property def pdu(self) -> bytes: @@ -3212,10 +3202,10 @@ def __init__( self.transfer_request_parameter_record = transfer_request_parameter_record @classmethod - def _from_pdu(cls, pdu: bytes) -> TransferDataRequest: + def _from_pdu(cls, pdu: bytes) -> Self: block_sequence_counter = pdu[1] transfer_request_parameter_record = pdu[2:] - return TransferDataRequest(block_sequence_counter, transfer_request_parameter_record) + return cls(block_sequence_counter, transfer_request_parameter_record) @property def pdu(self) -> bytes: @@ -3252,9 +3242,9 @@ def pdu(self) -> bytes: return bytes([self.RESPONSE_SERVICE_ID]) + self.transfer_response_parameter_record @classmethod - def _from_pdu(cls, pdu: bytes) -> RequestTransferExitResponse: + def _from_pdu(cls, pdu: bytes) -> Self: transfer_response_parameter_record = pdu[1:] - return RequestTransferExitResponse(transfer_response_parameter_record) + return cls(transfer_response_parameter_record) def matches(self, request: UDSRequest) -> bool: return isinstance(request, RequestTransferExitRequest) @@ -3281,9 +3271,9 @@ def pdu(self) -> bytes: return bytes([self.SERVICE_ID]) + self.transfer_request_parameter_record @classmethod - def _from_pdu(cls, pdu: bytes) -> RequestTransferExitRequest: + def _from_pdu(cls, pdu: bytes) -> Self: transfer_request_parameter_record = pdu[1:] - return RequestTransferExitRequest(transfer_request_parameter_record) + return cls(transfer_request_parameter_record) class RequestTransferExit(UDSService, service_id=UDSIsoServices.RequestTransferExit): diff --git a/src/gallia/services/uds/ecu.py b/src/gallia/services/uds/ecu.py index 424526844..61b9c9ff5 100644 --- a/src/gallia/services/uds/ecu.py +++ b/src/gallia/services/uds/ecu.py @@ -2,8 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import asyncio from asyncio import Task from datetime import UTC, datetime diff --git a/src/gallia/transports/doip.py b/src/gallia/transports/doip.py index 5d1d0174a..119ee6ef7 100644 --- a/src/gallia/transports/doip.py +++ b/src/gallia/transports/doip.py @@ -2,8 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import asyncio import socket import struct @@ -40,7 +38,7 @@ class RoutingActivationRequestTypes(IntEnum): CentralSecurity = 0xE0 @classmethod - def _missing_(cls, value: Any) -> RoutingActivationRequestTypes: + def _missing_(cls, value: Any) -> "RoutingActivationRequestTypes": if value in range(0xE1, 0x100): return cls.ManufacturerSpecific return cls.RESERVED @@ -62,7 +60,7 @@ class RoutingActivationResponseCodes(IntEnum): SuccessConfirmationRequired = 0x11 @classmethod - def _missing_(cls, value: Any) -> RoutingActivationResponseCodes: + def _missing_(cls, value: Any) -> "RoutingActivationResponseCodes": if value in range(0xE0, 0xFF): return cls.ManufacturerSpecific return cls.RESERVED @@ -113,7 +111,7 @@ class DiagnosticMessageNegativeAckCodes(IntEnum): TransportProtocolError = 0x08 @classmethod - def _missing_(cls, value: Any) -> DiagnosticMessageNegativeAckCodes: + def _missing_(cls, value: Any) -> "DiagnosticMessageNegativeAckCodes": return cls.RESERVED @@ -135,7 +133,7 @@ class GenericDoIPHeaderNACKCodes(IntEnum): InvalidPayloadLength = 0x04 @classmethod - def _missing_(cls, value: Any) -> GenericDoIPHeaderNACKCodes: + def _missing_(cls, value: Any) -> "GenericDoIPHeaderNACKCodes": return cls.RESERVED @@ -178,7 +176,7 @@ def pack(self) -> bytes: ) @classmethod - def unpack(cls, data: bytes) -> GenericHeader: + def unpack(cls, data: bytes) -> Self: ( protocol_version, inverse_protocol_version, @@ -205,7 +203,7 @@ def pack(self) -> bytes: ) @classmethod - def unpack(cls, data: bytes) -> GenericDoIPHeaderNACK: + def unpack(cls, data: bytes) -> Self: (generic_header_NACK_code,) = struct.unpack("!B", data) return cls( GenericDoIPHeaderNACKCodes(generic_header_NACK_code), @@ -217,6 +215,29 @@ class VehicleIdentificationRequestMessage: def pack(self) -> bytes: return b"" +@unique +class FurtherActionCodes(IntEnum): + RESERVED = 0x0F + ManufacturerSpecific = 0xFF + NoFurtherActionRequired = 0x00 + RoutingActivationRequiredToInitiateCentralSecurity = 0x10 + + @classmethod + def _missing_(cls, value: Any) -> "FurtherActionCodes": + if value in range(0x11, 0x100): + return cls.ManufacturerSpecific + return cls.RESERVED + +@unique +class SynchronisationStatusCodes(IntEnum): + RESERVED = 0xFF + VINGIDSynchronized = 0x00 + IncompleteVINGIDNotSynchronized = 0x10 + + @classmethod + def _missing_(cls, value: Any) -> "SynchronisationStatusCodes": + return cls.RESERVED + @dataclass class VehicleAnnouncementMessage: @@ -228,7 +249,7 @@ class VehicleAnnouncementMessage: VINGIDSyncStatus: SynchronisationStatusCodes | None @classmethod - def unpack(cls, data: bytes) -> VehicleAnnouncementMessage: + def unpack(cls, data: bytes) -> Self: if len(data) == 32: # VINGIDSyncStatus is optional (vin, logical_address, eid, gid, further_action_required) = struct.unpack( @@ -257,37 +278,23 @@ def unpack(cls, data: bytes) -> VehicleAnnouncementMessage: ) -@unique -class FurtherActionCodes(IntEnum): - RESERVED = 0x0F - ManufacturerSpecific = 0xFF - NoFurtherActionRequired = 0x00 - RoutingActivationRequiredToInitiateCentralSecurity = 0x10 - - @classmethod - def _missing_(cls, value: Any) -> FurtherActionCodes: - if value in range(0x11, 0x100): - return cls.ManufacturerSpecific - return cls.RESERVED +@dataclass +class DoIPEntityStatusRequest: + def pack(self) -> bytes: + return b"" @unique -class SynchronisationStatusCodes(IntEnum): +class NodeTypes(IntEnum): RESERVED = 0xFF - VINGIDSynchronized = 0x00 - IncompleteVINGIDNotSynchronized = 0x10 + Gateway = 0x00 + Node = 0x01 @classmethod - def _missing_(cls, value: Any) -> SynchronisationStatusCodes: + def _missing_(cls, value: Any) -> "NodeTypes": return cls.RESERVED -@dataclass -class DoIPEntityStatusRequest: - def pack(self) -> bytes: - return b"" - - @dataclass class DoIPEntityStatusResponse: NodeType: NodeTypes @@ -296,7 +303,7 @@ class DoIPEntityStatusResponse: MaximumDataSize: int | None @classmethod - def unpack(cls, data: bytes) -> DoIPEntityStatusResponse: + def unpack(cls, data: bytes) -> Self: if len(data) == 3: # MaximumDataSize is optional (nt, mcts, ncts) = struct.unpack("!BBB", data) @@ -307,17 +314,6 @@ def unpack(cls, data: bytes) -> DoIPEntityStatusResponse: return cls(NodeTypes(nt), mcts, ncts, mds) -@unique -class NodeTypes(IntEnum): - RESERVED = 0xFF - Gateway = 0x00 - Node = 0x01 - - @classmethod - def _missing_(cls, value: Any) -> NodeTypes: - return cls.RESERVED - - @dataclass class RoutingActivationRequest: SourceAddress: int @@ -338,7 +334,7 @@ class RoutingActivationResponse: # OEMReserved uint32 @classmethod - def unpack(cls, data: bytes) -> RoutingActivationResponse: + def unpack(cls, data: bytes) -> Self: ( source_address, target_address, @@ -372,7 +368,7 @@ def pack(self) -> bytes: ) @classmethod - def unpack(cls, data: bytes) -> DiagnosticMessage: + def unpack(cls, data: bytes) -> Self: source_address, target_address = struct.unpack("!HH", data[:4]) data = data[4:] return cls(source_address, target_address, data) @@ -401,7 +397,7 @@ class DiagnosticMessagePositiveAcknowledgement(DiagnosticMessageAcknowledgement) ACKCode: DiagnosticMessagePositiveAckCodes @classmethod - def unpack(cls, data: bytes) -> DiagnosticMessagePositiveAcknowledgement: + def unpack(cls, data: bytes) -> Self: source_address, target_address, ack_code = struct.unpack("!HHB", data[:5]) prev_data = data[5:] @@ -417,7 +413,7 @@ class DiagnosticMessageNegativeAcknowledgement(DiagnosticMessageAcknowledgement) ACKCode: DiagnosticMessageNegativeAckCodes @classmethod - def unpack(cls, data: bytes) -> DiagnosticMessageNegativeAcknowledgement: + def unpack(cls, data: bytes) -> Self: source_address, target_address, ack_code = struct.unpack("!HHB", data[:5]) prev_data = data[5:] diff --git a/src/gallia/transports/hsfz.py b/src/gallia/transports/hsfz.py index f54a28e37..74d502e90 100644 --- a/src/gallia/transports/hsfz.py +++ b/src/gallia/transports/hsfz.py @@ -2,8 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import asyncio import errno import struct @@ -38,7 +36,7 @@ class HSFZStatus(IntEnum): OutOfMemory = 0xFF @classmethod - def _missing_(cls, value: Any) -> HSFZStatus: + def _missing_(cls, value: Any) -> "HSFZStatus": return cls.UNDEFINED @@ -65,7 +63,7 @@ def pack(self) -> bytes: return struct.pack("!BB", self.src_addr, self.dst_addr) @classmethod - def unpack(cls, data: bytes) -> HSFZDiagReqHeader: + def unpack(cls, data: bytes) -> Self: src_addr, dst_addr = struct.unpack("!BB", data) return cls(src_addr, dst_addr) @@ -105,7 +103,7 @@ async def connect( src_addr: int, dst_addr: int, ack_timeout: float, - ) -> HSFZConnection: + ) -> Self: reader, writer = await asyncio.open_connection(host, port) return cls( reader, @@ -332,7 +330,7 @@ async def connect( cls, target: str | TargetURI, timeout: float | None = None, - ) -> HSFZTransport: + ) -> Self: t = TargetURI(target) if isinstance(target, str) else target if t.hostname is None: raise ValueError("no hostname specified") diff --git a/src/gallia/transports/tcp.py b/src/gallia/transports/tcp.py index 0a640c2ae..c3c054214 100644 --- a/src/gallia/transports/tcp.py +++ b/src/gallia/transports/tcp.py @@ -2,8 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import asyncio from typing import Self diff --git a/src/gallia/utils.py b/src/gallia/utils.py index 336bee5bc..da2bb731d 100644 --- a/src/gallia/utils.py +++ b/src/gallia/utils.py @@ -2,8 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import asyncio import contextvars import importlib.util