Skip to content

Commit a3efc88

Browse files
committed
Initial parts of new API
Adds API surface for most bits, still needs discovery
1 parent 1f39ac1 commit a3efc88

File tree

8 files changed

+981
-0
lines changed

8 files changed

+981
-0
lines changed

sbot/future/classless/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .robot import arduino, comp, motor, power, servo, utils
2+
3+
__all__ = ['arduino', 'comp', 'motor', 'power', 'servo', 'utils']

sbot/future/classless/arduinos.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
"""The Arduino module provides an interface to the Arduino firmware."""
2+
from enum import Enum, IntEnum
3+
4+
from sbot.logging import log_to_debug
5+
from sbot.serial_wrapper import SerialWrapper
6+
from sbot.utils import map_to_float
7+
8+
from .utils import BoardManager
9+
10+
11+
class GPIOPinMode(str, Enum):
12+
"""The possible modes for a GPIO pin."""
13+
14+
INPUT = 'INPUT'
15+
INPUT_PULLUP = 'INPUT_PULLUP'
16+
OUTPUT = 'OUTPUT'
17+
18+
19+
class AnalogPin(IntEnum):
20+
"""The analog pins on the Arduino."""
21+
22+
A0 = 14
23+
A1 = 15
24+
A2 = 16
25+
A3 = 17
26+
A4 = 18
27+
A5 = 19
28+
29+
30+
DISABLED_PINS = (0, 1)
31+
AVAILABLE_PINS = range(0, max(AnalogPin) + 1)
32+
33+
ADC_MAX = 1023 # 10 bit ADC
34+
ADC_MIN = 0
35+
36+
37+
class Arduino:
38+
"""
39+
The Arduino board interface.
40+
41+
This is intended to be used with Arduino Uno boards running the sbot firmware.
42+
43+
:param boards: The BoardManager object containing the arduino board references.
44+
"""
45+
46+
__slots__ = ('_boards',)
47+
48+
def __init__(self, boards: BoardManager):
49+
# Obtain a reference to the arduino
50+
# This is contained in a list to allow for it to be populated later
51+
self._boards = boards.arduino
52+
53+
@log_to_debug
54+
def set_pin_mode(self, pin: int, mode: GPIOPinMode) -> None:
55+
"""
56+
Set the mode of the pin.
57+
58+
To do analog or digital reads set the mode to INPUT or INPUT_PULLUP.
59+
To do digital writes set the mode to OUTPUT.
60+
61+
:param pin: The pin to set the mode of.
62+
:param value: The mode to set the pin to.
63+
:raises IOError: If the pin mode is not a GPIOPinMode.
64+
:raises IOError: If this pin cannot be controlled.
65+
"""
66+
port = self._get_port()
67+
self._validate_pin(pin)
68+
if not isinstance(mode, GPIOPinMode):
69+
raise IOError('Pin mode only supports being set to a GPIOPinMode')
70+
port.write(f'PIN:{pin}:MODE:SET:{mode.value}')
71+
72+
@log_to_debug
73+
def digital_read(self, pin: int) -> bool:
74+
"""
75+
Perform a digital read on the pin.
76+
77+
:param pin: The pin to read from.
78+
:raises IOError: If the pin's current mode does not support digital read
79+
:raises IOError: If this pin cannot be controlled.
80+
:return: The digital value of the pin.
81+
"""
82+
port = self._get_port()
83+
self._validate_pin(pin)
84+
response = port.query(f'PIN:{pin}:DIGITAL:GET?')
85+
return (response == '1')
86+
87+
@log_to_debug
88+
def digital_write(self, pin: int, value: bool) -> None:
89+
"""
90+
Write a digital value to the pin.
91+
92+
:param pin: The pin to write to.
93+
:param value: The value to write to the pin.
94+
:raises IOError: If the pin's current mode does not support digital write.
95+
:raises IOError: If this pin cannot be controlled.
96+
"""
97+
port = self._get_port()
98+
self._validate_pin(pin)
99+
try:
100+
if value:
101+
port.write(f'PIN:{pin}:DIGITAL:SET:1')
102+
else:
103+
port.write(f'PIN:{pin}:DIGITAL:SET:0')
104+
except RuntimeError as e:
105+
# The firmware returns a NACK if the pin is not in OUTPUT mode
106+
if 'is not supported in' in str(e):
107+
raise IOError(str(e))
108+
109+
@log_to_debug
110+
def analog_read(self, pin: int) -> float:
111+
"""
112+
Get the analog voltage on the pin.
113+
114+
This is returned in volts. Only pins A0-A5 support analog reads.
115+
116+
:param pin: The pin to read from.
117+
:raises IOError: If the pin or its current mode does not support analog read.
118+
:raises IOError: If this pin cannot be controlled.
119+
:return: The analog voltage on the pin, ranges from 0 to 5.
120+
"""
121+
port = self._get_port()
122+
self._validate_pin(pin)
123+
if pin not in AnalogPin:
124+
raise IOError('Pin does not support analog read')
125+
try:
126+
response = port.query(f'PIN:{pin}:ANALOG:GET?')
127+
except RuntimeError as e:
128+
# The firmware returns a NACK if the pin is not in INPUT mode
129+
if 'is not supported in' in str(e):
130+
raise IOError(str(e))
131+
# map the response from the ADC range to the voltage range
132+
return map_to_float(int(response), ADC_MIN, ADC_MAX, 0.0, 5.0)
133+
134+
@log_to_debug
135+
def measure_ultrasound_distance(self, pulse_pin: int, echo_pin: int) -> int:
136+
"""
137+
Measure the distance to an object using an ultrasound sensor.
138+
139+
The sensor can only measure distances up to 4000mm.
140+
141+
:param pulse_pin: The pin to send the ultrasound pulse from.
142+
:param echo_pin: The pin to read the ultrasound echo from.
143+
:raises ValueError: If either of the pins are invalid
144+
:return: The distance measured by the ultrasound sensor in mm.
145+
"""
146+
port = self._get_port()
147+
try: # bounds check
148+
self._validate_pin(pulse_pin)
149+
except (IndexError, IOError):
150+
raise ValueError("Invalid pulse pin provided") from None
151+
try:
152+
self._validate_pin(echo_pin)
153+
except (IndexError, IOError):
154+
raise ValueError("Invalid echo pin provided") from None
155+
156+
response = port.query(f'ULTRASOUND:{pulse_pin}:{echo_pin}:MEASURE?')
157+
return int(response)
158+
159+
def _validate_pin(self, pin: int) -> None:
160+
if pin in DISABLED_PINS:
161+
raise IOError('This pin cannot be controlled.')
162+
if pin not in AVAILABLE_PINS:
163+
raise IndexError(f'Pin {pin} is not available on the Arduino.')
164+
165+
def _get_port(self) -> SerialWrapper:
166+
if len(self._boards) == 0:
167+
raise RuntimeError("No Arduino connected")
168+
return self._boards[0]
169+
170+
def __repr__(self) -> str:
171+
try:
172+
port = self._get_port()
173+
except RuntimeError:
174+
return f"<{self.__class__.__qualname__} no arduino connected>"
175+
else:
176+
return f"<{self.__class__.__qualname__} {port}>"

sbot/future/classless/comp.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
"""
2+
Implementation of loading metadata.
3+
4+
Metadata is a dictionary of information about the environment that the robot is running in.
5+
It usually includes the starting zone and a flag indicating whether we are in
6+
competition or development mode. Metadata is stored in a JSON file, typically on a
7+
competition USB stick. The environment variable SBOT_METADATA_PATH specifies a directory
8+
where it, and its children, are searched for the JSON file to load.
9+
10+
Example metadata file:
11+
```json
12+
{
13+
"zone": 2,
14+
"is_competition": true
15+
}
16+
```
17+
"""
18+
from __future__ import annotations
19+
20+
import json
21+
import logging
22+
import os
23+
from pathlib import Path
24+
from typing import TypedDict
25+
26+
from sbot.exceptions import MetadataKeyError, MetadataNotReadyError
27+
28+
logger = logging.getLogger(__name__)
29+
30+
# The name of the environment variable that specifies the path to search
31+
# for metadata USB sticks
32+
METADATA_ENV_VAR = "SBOT_METADATA_PATH"
33+
# The name of the metadata file
34+
METADATA_NAME = "metadata.json"
35+
36+
37+
class Metadata(TypedDict):
38+
"""
39+
The structure of the metadata dictionary.
40+
41+
:param is_competition: Whether the robot is in competition mode
42+
:param zone: The zone that the robot is in
43+
"""
44+
45+
is_competition: bool
46+
zone: int
47+
48+
49+
# The default metadata to use if no file is found
50+
DEFAULT_METADATA: Metadata = {
51+
"is_competition": False,
52+
"zone": 0,
53+
}
54+
55+
56+
class Comp:
57+
"""
58+
A collection of the robot metadata.
59+
60+
This class is used to load and access the metadata of the robot.
61+
"""
62+
63+
def __init__(self) -> None:
64+
self._metadata: Metadata | None = None
65+
66+
@property
67+
def is_competition(self) -> bool:
68+
"""
69+
Whether the robot is in a competition environment.
70+
71+
This value is not available until wait_start has been called.
72+
73+
:raises MetadataNotReadyError: If the metadata has not been loaded
74+
"""
75+
if self._metadata is None:
76+
raise MetadataNotReadyError()
77+
return self._metadata["is_competition"]
78+
79+
@property
80+
def zone(self) -> int:
81+
"""
82+
The zone that the robot is in.
83+
84+
This value is not available until wait_start has been called.
85+
86+
:raises MetadataNotReadyError: If the metadata has not been loaded
87+
"""
88+
if self._metadata is None:
89+
raise MetadataNotReadyError()
90+
return self._metadata["zone"]
91+
92+
def _load(self) -> None:
93+
"""
94+
Search for a metadata file and load it.
95+
96+
Searches the path identified by SBOT_METADATA_PATH and its children for
97+
metadata.json (set by METADATA_NAME) and reads it.
98+
"""
99+
search_path = os.environ.get(METADATA_ENV_VAR)
100+
if search_path:
101+
search_root = Path(search_path)
102+
if not search_root.is_dir():
103+
logger.error(f"Metaddata path {search_path} does not exist")
104+
return
105+
for item in Path(search_path).iterdir():
106+
try:
107+
if item.is_dir() and (item / METADATA_NAME).exists():
108+
self._metadata = _load_metadata(item / METADATA_NAME)
109+
elif item.name == METADATA_NAME:
110+
self._metadata = _load_metadata(item)
111+
return
112+
except PermissionError:
113+
logger.debug(f"Unable to read {item}")
114+
else:
115+
logger.info(f"No JSON metadata files found in {search_path}")
116+
else:
117+
logger.info(f"{METADATA_ENV_VAR} not set, not loading metadata")
118+
119+
self._metadata = DEFAULT_METADATA
120+
121+
122+
def _load_metadata(path: Path) -> Metadata:
123+
"""
124+
Load the metadata from a JSON file, found by `load`.
125+
126+
The file must be a JSON dictionary with the keys `is_competition` and `zone`.
127+
128+
:param path: The path to the metadata file
129+
:raises RuntimeError: If the metadata file is invalid JSON
130+
:raises TypeError: If the metadata file is not a JSON dictionary
131+
:raises MetadataKeyError: If the metadata file is missing a required key
132+
:return: The metadata dictionary
133+
"""
134+
logger.info(f"Loading metadata from {path}")
135+
with path.open() as file:
136+
try:
137+
obj: Metadata = json.load(file)
138+
except json.decoder.JSONDecodeError as e:
139+
raise RuntimeError("Unable to load metadata.") from e
140+
141+
if not isinstance(obj, dict):
142+
raise TypeError(f"Found metadata file, but format is invalid. Got: {obj}")
143+
144+
# check required keys exist at runtime
145+
for key in Metadata.__annotations__.keys():
146+
if key not in obj.keys():
147+
raise MetadataKeyError(key)
148+
149+
return obj

0 commit comments

Comments
 (0)