diff --git a/CHANGELOG.md b/CHANGELOG.md index c1c3337e..76f64a02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.12.0] 2024-??-?? + +### Added + +- NFC communication available +- Starting Speculos with the `--transport` argument allows to choose U2F, HID or NFC transport +- Flex and Stax OSes emulation always consider NFC to be up (it can't be deactivated for now) + ## [0.11.0] 2024-11-12 ### Added diff --git a/speculos/main.py b/speculos/main.py index f7ba54db..96f9aa23 100644 --- a/speculos/main.py +++ b/speculos/main.py @@ -29,6 +29,7 @@ from .mcu.finger_tcp import FakeFinger from .mcu.struct import DisplayArgs, ServerArgs from .mcu.vnc import VNC +from .mcu.transport import TransportType from .observer import BroadcastInterface from .resources_importer import resources @@ -268,7 +269,9 @@ def main(prog=None) -> int: 'to use a hex seed, prefix it with "hex:"') parser.add_argument('-t', '--trace', action='store_true', help='Trace syscalls') parser.add_argument('-u', '--usb', default='hid', help='Configure the USB transport protocol, ' - 'either HID (default) or U2F') + 'either HID (default) or U2F (DEPRECATED, use `--transport` instead)') + parser.add_argument('-T', '--transport', default=None, choices=('HID', 'U2F', 'NFC'), + help='Configure the transport protocol: HID (default), U2F or NFC.') group = parser.add_argument_group('network arguments') group.add_argument('--apdu-port', default=9999, type=int, help='ApduServer TCP port') @@ -466,6 +469,12 @@ def main(prog=None) -> int: qemu_pid = run_qemu(s1, s2, args, use_bagl) s1.close() + # The `--transport` argument takes precedence over `--usb` + if args.transport is not None: + transport_type = TransportType[args.transport] + else: + transport_type = TransportType[args.usb.upper()] + apdu = apdu_server.ApduServer(host="0.0.0.0", port=args.apdu_port) seph = seproxyhal.SeProxyHal( s2, @@ -473,7 +482,7 @@ def main(prog=None) -> int: use_bagl=use_bagl, automation=automation_path, automation_server=automation_server, - transport=args.usb) + transport=transport_type) button = None if args.button_port: diff --git a/speculos/mcu/seproxyhal.py b/speculos/mcu/seproxyhal.py index 8eed5ca3..09810a7f 100644 --- a/speculos/mcu/seproxyhal.py +++ b/speculos/mcu/seproxyhal.py @@ -8,7 +8,7 @@ from typing import Callable, List, Optional, Tuple from speculos.observer import BroadcastInterface, TextEvent -from . import usb +from .transport import build_transport, TransportType from .automation import Automation from .display import DisplayNotifier, IODevice from .nbgl import NBGL @@ -30,6 +30,9 @@ class SephTag(IntEnum): USB_CONFIG = 0x4f USB_EP_PREPARE = 0x50 + NFC_RAPDU = 0x4A + NFC_POWER = 0x34 + REQUEST_STATUS = 0x52 RAPDU = 0x53 PLAY_TUNE = 0x56 @@ -253,7 +256,7 @@ def __init__(self, use_bagl: bool, automation: Optional[Automation] = None, automation_server: Optional[BroadcastInterface] = None, - transport: str = 'hid'): + transport: TransportType = TransportType.HID): self._socket = sock self.logger = logging.getLogger("seproxyhal") self.printf_queue = '' @@ -270,7 +273,7 @@ def __init__(self, self.socket_helper.wait_until_tick_is_processed) self.time_ticker_thread.start() - self.usb = usb.USB(self.socket_helper.queue_packet, transport=transport) + self.transport = build_transport(self.socket_helper.queue_packet, transport) self.ocr = OCR(model, use_bagl) @@ -389,10 +392,10 @@ def can_read(self, screen: DisplayNotifier): c(data) elif tag == SephTag.USB_CONFIG: - self.usb.config(data) + self.transport.config(data) elif tag == SephTag.USB_EP_PREPARE: - data = self.usb.prepare(data) + data = self.transport.prepare(data) if data: for c in self.apdu_callbacks: c(data) @@ -449,6 +452,13 @@ def can_read(self, screen: DisplayNotifier): assert isinstance(screen.display.gl, NBGL) screen.display.gl.hal_draw_image_file(data) + elif tag == SephTag.NFC_RAPDU: + data = self.transport.handle_rapdu(data) + if data is not None: + for c in self.apdu_callbacks: + c(data) + screen.display.forward_to_apdu_client(data) + else: self.logger.error(f"unknown tag: {tag:#x}") sys.exit(0) @@ -506,7 +516,7 @@ def to_app(self, packet: bytes): tag, packet = packet[4], packet[5:] self.socket_helper.queue_packet(SephTag(tag), packet) else: - self.usb.xfer(packet) + self.transport.send(packet) def get_tick_count(self): return self.socket_helper.get_tick_count() diff --git a/speculos/mcu/transport/__init__.py b/speculos/mcu/transport/__init__.py new file mode 100644 index 00000000..2def3fc8 --- /dev/null +++ b/speculos/mcu/transport/__init__.py @@ -0,0 +1,17 @@ +from typing import Callable + +from .interface import TransportLayer, TransportType +from .nfc import NFC +from .usb import HID, U2F + + +def build_transport(cb: Callable, transport: TransportType) -> TransportLayer: + if transport is TransportType.NFC: + return NFC(cb, transport) + elif transport is TransportType.U2F: + return U2F(cb, transport) + else: + return HID(cb, transport) + + +__all__ = ["build_transport", "TransportType"] diff --git a/speculos/mcu/transport/interface.py b/speculos/mcu/transport/interface.py new file mode 100644 index 00000000..2eff09a2 --- /dev/null +++ b/speculos/mcu/transport/interface.py @@ -0,0 +1,36 @@ +from abc import ABC, abstractmethod +from enum import auto, IntEnum +from typing import Callable, Optional + + +class TransportType(IntEnum): + HID = auto() + NFC = auto() + U2F = auto() + + +class TransportLayer(ABC): + + def __init__(self, send_cb: Callable, transport: TransportType): + self._transport = transport + self._send_cb = send_cb + + @property + def type(self) -> TransportType: + return self._transport + + @abstractmethod + def config(self, data: bytes) -> None: + raise NotImplementedError + + @abstractmethod + def prepare(self, data: bytes) -> Optional[bytes]: + raise NotImplementedError + + @abstractmethod + def send(self, data: bytes) -> None: + raise NotImplementedError + + @abstractmethod + def handle_rapdu(self, data: bytes) -> Optional[bytes]: + raise NotImplementedError diff --git a/speculos/mcu/transport/nfc.py b/speculos/mcu/transport/nfc.py new file mode 100644 index 00000000..3d308885 --- /dev/null +++ b/speculos/mcu/transport/nfc.py @@ -0,0 +1,75 @@ +""" +Forward NFC packets between the MCU and the SE +""" + +import enum +import logging +from typing import List, Optional + +from .interface import TransportLayer, TransportType + + +class SephNfcTag(enum.IntEnum): + NFC_APDU_EVENT = 0x1C + NFC_EVENT = 0x1E + + +class NFC(TransportLayer): + def __init__(self, send_cb, transport: TransportType): + super().__init__(send_cb, transport) + self.MTU = 140 + self.rx_sequence = 0 + self.rx_size = 0 + self.rx_data: bytes = b'' + self.logger = logging.getLogger("NFC") + + def config(self, data: bytes) -> None: + self.logger.warning("USB-specific 'config' method called on NFC transport. Ignored.") + + def prepare(self, data: bytes) -> None: + self.logger.warning("USB-specific 'prepare' method called on NFC transport. Ignored.") + + def handle_rapdu(self, data: bytes) -> Optional[bytes]: + """concatenate apdu chunks into full apdu""" + # example of data + # 0000050000002b3330000409312e302e302d72633104e600000008362e312e302d646508352e312e302d6465010001009000 + + # only APDU packets are supported + if data[2] != 0x05: + return None + + sequence = int.from_bytes(data[3:5], 'big') + assert self.rx_sequence == sequence, f"Unexpected sequence number:{sequence}" + + if sequence == 0: + self.rx_size = int.from_bytes(data[5:7], "big") + self.rx_data = data[7:] + else: + self.rx_data += data[5:] + + if len(self.rx_data) == self.rx_size: + # prepare for next call + self.rx_sequence = 0 + return self.rx_data + else: + self.rx_sequence += 1 + return None + + def send(self, data: bytes) -> None: + chunks: List[bytes] = [] + data_len = len(data) + + while len(data) > 0: + size = self.MTU - 5 + chunks.append(data[:size]) + data = data[size:] + + for i, chunk in enumerate(chunks): + # Ledger protocol header + header = bytes([0x00, 0x00, 0x05]) # APDU + header += i.to_bytes(2, "big") + # first packet contains the size of full buffer + if i == 0: + header += data_len.to_bytes(2, "big") + + self._send_cb(SephNfcTag.NFC_APDU_EVENT, header + chunk) diff --git a/speculos/mcu/usb.py b/speculos/mcu/transport/usb.py similarity index 51% rename from speculos/mcu/usb.py rename to speculos/mcu/transport/usb.py index d45e85ab..2c96fed7 100644 --- a/speculos/mcu/usb.py +++ b/speculos/mcu/transport/usb.py @@ -3,20 +3,22 @@ protocol. """ -from abc import ABC, abstractmethod -from construct import Int8ub, Int16ub, Int16ul, Struct -import binascii import enum import logging +from abc import abstractmethod +from construct import Int8ub, Int16ub, Int16ul, Struct +from typing import Callable, List, Optional + +from .interface import TransportLayer, TransportType -class UsbReq(enum.IntEnum): +class USBReq(enum.IntEnum): RECIPIENT_DEVICE = 0x00 SET_ADDRESS = 0x05 SET_CONFIGURATION = 0x09 -class SephUsbTag(enum.IntEnum): +class SephUSBTag(enum.IntEnum): XFER_SETUP = 0x01 XFER_IN = 0x02 XFER_OUT = 0x04 @@ -24,14 +26,14 @@ class SephUsbTag(enum.IntEnum): PREPARE_DIR_IN = 0x20 -class SephUsbConfig(enum.IntEnum): +class SephUSBConfig(enum.IntEnum): CONNECT = 0x01 DISCONNECT = 0x02 ADDR = 0x03 ENDPOINTS = 0x04 -class SephUsbPrepare(enum.IntEnum): +class SephUSBPrepare(enum.IntEnum): SETUP = 0x10 IN = 0x20 OUT = 0x30 @@ -39,19 +41,19 @@ class SephUsbPrepare(enum.IntEnum): UNSTALL = 0x80 -class HidEndpoint(enum.IntEnum): +class HIDEndpoint(enum.IntEnum): OUT_ADDR = 0x00 IN_ADDR = 0x80 -class UsbDevState(enum.IntEnum): +class USBDevState(enum.IntEnum): DISCONNECTED = 0 DEFAULT = 1 ADDRESSED = 2 CONFIGURED = 3 -class UsbInterface(enum.IntEnum): +class USBInterface(enum.IntEnum): GENERIC = 0 U2F = 1 HID = 2 @@ -81,7 +83,7 @@ class UsbInterface(enum.IntEnum): ) -class HidPacket: +class HIDPacket: def __init__(self): self.reset(0) @@ -103,72 +105,148 @@ def complete(self): return self.remaining_size == 0 -class Transport(ABC): - def __init__(self, interface, send_xfer): - self.interface = interface - self.send_xfer = send_xfer +class USBTransport(TransportLayer): + INTERFACE = USBInterface.GENERIC - @abstractmethod - def xfer(self, data): - pass + def __init__(self, send_cb: Callable, transport: TransportType = TransportType.HID): + super().__init__(send_cb, transport) + self.packets_to_send: List[bytes] = [] + self.state = USBDevState.DISCONNECTED + self.logger = logging.getLogger("USB") + + @property + def endpoint_in(self): + return HIDEndpoint.IN_ADDR | self.INTERFACE + + @property + def endpoint_out(self): + return HIDEndpoint.OUT_ADDR | self.INTERFACE + + def _send_xfer(self, packet: bytes) -> None: + # don't send packets until the endpoint is configured + if self.state != USBDevState.CONFIGURED or len(self.packets_to_send) > 0: + self.packets_to_send.append(packet) + return + + self.logger.debug("[SEND_XFER] %s", packet.hex()) + self._send_cb(SephUSBTag.XFER_EVENT, packet) + + def _send_setup(self, breq: USBReq, wValue: int): + data = usb_header.build(dict(endpoint=self.endpoint_out, tag=SephUSBTag.XFER_SETUP, length=0)) + data += usb_setup.build(dict(bmreq=USBReq.RECIPIENT_DEVICE, breq=breq, wValue=wValue, wIndex=0, wLength=0)) + self.logger.debug("[SEND_SETUP] %s", data.hex()) + self._send_cb(SephUSBTag.XFER_EVENT, data) + + def _flush_packets(self) -> None: + packets_to_send = self.packets_to_send + self.packets_to_send = [] + for packet in packets_to_send: + self._send_xfer(packet) + + def handle_rapdu(self, data: bytes) -> Optional[bytes]: + self.logger.warning("NFC-specific 'handle_apdu' method called on USB transport. Ignored.") + return None @abstractmethod - def build_xfer(self, data): - pass + def _config(self, data: SephUSBConfig) -> None: + raise NotImplementedError + + def config(self, data: bytes) -> None: + """Parse a config packet. If the endpoint address is set, configure it.""" + + tag = SephUSBConfig(data[0]) + self.logger.debug("[CONFIG] %s %s", repr(tag), data.hex()) + + # The USB stack is shut down with USB_power(0) before being powered on. + # Wait for the first CONNECT config message to ensure that USBD_Start() + # has been called. + if tag == SephUSBConfig.CONNECT: + if self.state == USBDevState.DISCONNECTED: + self.state = USBDevState.ADDRESSED + self.logger.debug("set_address sent") + self._send_setup(USBReq.SET_ADDRESS, 1) + + elif tag == SephUSBConfig.DISCONNECT: + self.state = USBDevState.DISCONNECTED + self._config(tag) + + elif tag == SephUSBConfig.ADDR: + if self.state == USBDevState.ADDRESSED: + self.state = USBDevState.CONFIGURED + self._send_setup(USBReq.SET_CONFIGURATION, 1) + self.logger.debug("USB configured") + + elif tag == SephUSBConfig.ENDPOINTS: + # once the endpoint is configured, queued packets can be sent + endpoint = data[2] + if endpoint == self.endpoint_out: + self._flush_packets() @abstractmethod - def prepare(self, data): - pass + def _prepare(self, data: bytes) -> Optional[bytes]: + raise NotImplementedError - def config(self, tag): - pass + def prepare(self, data: bytes) -> Optional[bytes]: + """Send or receive a packet chunk.""" - @property - def endpoint_in(self): - return HidEndpoint.IN_ADDR | self.interface + header = usb_header.parse(data[:3]) + answer = None + tag = SephUSBPrepare(header.tag) + self.logger.debug("[PREPARE] %s %s %s", repr(self.state), repr(tag), data.hex()) - @property - def endpoint_out(self): - return HidEndpoint.OUT_ADDR | self.interface + if tag == SephUSBPrepare.IN: + if header.endpoint == self.endpoint_in: + assert header.length == USB_SIZE + data = data[usb_header.sizeof():] + answer = self._prepare(data) + return answer + + +class U2F(USBTransport): + INTERFACE = USBInterface.U2F -class U2f(Transport): - def __init__(self, send_xfer): - super().__init__(UsbInterface.U2F, send_xfer) + def __init__(self, send_cb: Callable, transport: TransportType): + super().__init__(send_cb, transport) - def build_xfer(self, tag, data): + def _config(self, data: SephUSBConfig) -> None: + pass + + def _build_xfer(self, tag: SephUSBTag, data: bytes) -> bytes: packet = usb_header.build(dict(endpoint=self.endpoint_out, tag=tag, length=len(data))) packet += data return packet - def xfer(self, data): + def send(self, data: bytes) -> None: assert len(data) == USB_SIZE - packet = self.build_xfer(SephUsbTag.XFER_OUT, data) - self.send_xfer(packet) + packet = self._build_xfer(SephUSBTag.XFER_OUT, data) + self._send_xfer(packet) - def prepare(self, data): + def _prepare(self, data: bytes) -> bytes: assert len(data) == USB_SIZE - packet = self.build_xfer(SephUsbTag.XFER_IN, b'') - self.send_xfer(packet) + packet = self._build_xfer(SephUSBTag.XFER_IN, b'') + self._send_xfer(packet) return data -class Hid(Transport): +class HID(USBTransport): + INTERFACE = USBInterface.HID + USB_CHANNEL = 0x0101 USB_COMMAND = 0x05 - def __init__(self, send_xfer): - super().__init__(UsbInterface.HID, send_xfer) - self.hid_packet = HidPacket() + def __init__(self, send_cb: Callable, transport: TransportType): + super().__init__(send_cb, transport) + self.hid_packet = HIDPacket() - def _build_header(self, data, length, seq): + def _build_header(self, data: bytes, length: int, seq: int) -> bytes: header = hid_header.build(dict(channel=self.USB_CHANNEL, command=self.USB_COMMAND, seq=seq, length=length)) if seq != 0: # strip hid_header.length header = header[:-2] return header - def build_xfer(self, tag, data, seq=0, length=USB_SIZE): + def _build_xfer(self, tag: SephUSBTag, data: bytes, seq: int = 0, length: int = USB_SIZE): header = self._build_header(data, length, seq) size = len(header) + len(data) @@ -178,7 +256,7 @@ def build_xfer(self, tag, data, seq=0, length=USB_SIZE): return packet - def xfer(self, data): + def send(self, data: bytes) -> None: seq = 0 offset = 0 while offset < len(data): @@ -192,17 +270,17 @@ def xfer(self, data): else: length = len(chunk) - packet = self.build_xfer(SephUsbTag.XFER_OUT, chunk, seq, length) - self.send_xfer(packet) + packet = self._build_xfer(SephUSBTag.XFER_OUT, chunk, seq, length) + self._send_xfer(packet) offset += len(chunk) seq += 1 - def config(self, tag): - if tag == UsbDevState.DISCONNECTED: + def _config(self, tag: SephUSBConfig) -> None: + if tag == USBDevState.DISCONNECTED: self.hid_packet.reset(0) - def prepare(self, data): + def _prepare(self, data: bytes) -> Optional[bytes]: hid = hid_header.parse(data) assert hid.channel == self.USB_CHANNEL assert hid.command == self.USB_COMMAND @@ -215,8 +293,8 @@ def prepare(self, data): chunk = data[hid_header.sizeof() - 2:] self.hid_packet.append_data(chunk) - packet = self.build_xfer(SephUsbTag.XFER_IN, b'', self.hid_packet.seq) - self.send_xfer(packet) + packet = self._build_xfer(SephUSBTag.XFER_IN, b'', self.hid_packet.seq) + self._send_xfer(packet) if self.hid_packet.complete(): answer = self.hid_packet.data @@ -225,90 +303,3 @@ def prepare(self, data): answer = None return answer - - -class USB: - def __init__(self, _queue_event_packet, transport='hid'): - self._queue_event_packet = _queue_event_packet - self.packets_to_send = [] - self.state = UsbDevState.DISCONNECTED - - if transport.lower() == 'hid': - self.transport = Hid(self.send_xfer) - elif transport.lower() == 'u2f': - self.transport = U2f(self.send_xfer) - else: - raise ValueError(f"Unsupported USB transport {transport!r}") - - self.logger = logging.getLogger("usb") - - def send_xfer(self, packet): - # don't send packets until the endpoint is configured - if self.state != UsbDevState.CONFIGURED or len(self.packets_to_send) > 0: - self.packets_to_send.append(packet) - return - - self.logger.debug("[SEND_XFER] {}".format(binascii.hexlify(packet))) - self._queue_event_packet(SephUsbTag.XFER_EVENT, packet) - - def _send_setup(self, breq, wValue): - data = usb_header.build(dict(endpoint=self.transport.endpoint_out, tag=SephUsbTag.XFER_SETUP, length=0)) - data += usb_setup.build(dict(bmreq=UsbReq.RECIPIENT_DEVICE, breq=breq, wValue=wValue, wIndex=0, wLength=0)) - self.logger.debug("[SEND_SETUP] {}".format(binascii.hexlify(data))) - self._queue_event_packet(SephUsbTag.XFER_EVENT, data) - - def _flush_packets(self): - packets_to_send = self.packets_to_send - self.packets_to_send = [] - for packet in packets_to_send: - self.send_xfer(packet) - - def config(self, data): - """Parse a config packet. If the endpoint address is set, configure it.""" - - tag = SephUsbConfig(data[0]) - self.logger.debug("[CONFIG] {} {}".format(repr(tag), binascii.hexlify(data))) - - # The USB stack is shut down with USB_power(0) before being powered on. - # Wait for the first CONNECT config message to ensure that USBD_Start() - # has been called. - if tag == SephUsbConfig.CONNECT: - if self.state == UsbDevState.DISCONNECTED: - self.state = UsbDevState.ADDRESSED - self.logger.debug("set_address sent") - self._send_setup(UsbReq.SET_ADDRESS, 1) - - elif tag == SephUsbConfig.DISCONNECT: - self.state = UsbDevState.DISCONNECTED - self.transport.config(tag) - - elif tag == SephUsbConfig.ADDR: - if self.state == UsbDevState.ADDRESSED: - self.state = UsbDevState.CONFIGURED - self._send_setup(UsbReq.SET_CONFIGURATION, 1) - self.logger.debug("configured") - - elif tag == SephUsbConfig.ENDPOINTS: - # once the endpoint is configured, queued packets can be sent - endpoint = data[2] - if endpoint == self.transport.endpoint_out: - self._flush_packets() - - def prepare(self, data): - """Send or receive a packet chunk.""" - - header = usb_header.parse(data[:3]) - answer = None - tag = SephUsbPrepare(header.tag) - self.logger.debug("[PREPARE] {} {} {}".format(repr(self.state), repr(tag), binascii.hexlify(data))) - - if tag == SephUsbPrepare.IN: - if header.endpoint == self.transport.endpoint_in: - assert header.length == USB_SIZE - data = data[usb_header.sizeof():] - answer = self.transport.prepare(data) - - return answer - - def xfer(self, data): - self.transport.xfer(data) diff --git a/src/bolos/os.c b/src/bolos/os.c index 596a13dd..ccbe8a67 100644 --- a/src/bolos/os.c +++ b/src/bolos/os.c @@ -9,6 +9,7 @@ #define OS_SETTING_PLANEMODE_OLD 5 #define OS_SETTING_PLANEMODE_NEW 6 #define OS_SETTING_SOUND 9 +#define OS_SETTING_FEATURES 14 #undef PATH_MAX #define PATH_MAX 1024 @@ -54,7 +55,7 @@ unsigned long sys_os_setting_get(unsigned int setting_id, return 1; } if (((hw_model == MODEL_STAX) || (hw_model == MODEL_FLEX)) && - setting_id == OS_SETTING_SOUND) { + (setting_id == OS_SETTING_SOUND || setting_id == OS_SETTING_FEATURES)) { return 0xff; } }