Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/main' into steezy-p…
Browse files Browse the repository at this point in the history
…arking
  • Loading branch information
HeadTriXz committed Jun 11, 2024
2 parents 005f6ff + aba5f1a commit 8160074
Show file tree
Hide file tree
Showing 37 changed files with 483 additions and 323 deletions.
64 changes: 2 additions & 62 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,6 @@ __pycache__/
*.py[cod]
*$py.class

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Environments
.env
Expand All @@ -80,8 +20,8 @@ venv.bak/
.idea
.vscode

**/.benchmarks
**/backups
# Data
**/can_recordings

# Compiled object detection model
*_openvino_model/
Expand Down
2 changes: 1 addition & 1 deletion .ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ignore = [
"PERF203",
]

exclude = ["scripts/**"]
exclude = ["scripts/python/original_controller.py"]

# Ignore unused imports in __init__.py files (would need __all__ otherwise)
[lint.per-file-ignores]
Expand Down
6 changes: 6 additions & 0 deletions configs/config.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ preprocessing:
center: 1.0
right: 0.62
white_threshold: 180
filter_threshold: 170

line_detection:
max_angle_difference: 30 # degrees
Expand Down Expand Up @@ -58,6 +59,7 @@ line_following:

initial_speed: 5 # km/h

no_lane_offset: 1.5 # meters
max_steering_range: 20 # degrees
look_ahead_distance: .5 # meters

Expand Down Expand Up @@ -113,6 +115,10 @@ traffic_light:

overtake:
consecutive_frames: 3
force_move:
enabled: true
angle: 1.0 # percentage
duration: 0.5 # seconds
force_return:
enabled: true
angle: 1.0 # percentage
Expand Down
Binary file removed data/images/stitched/chicane.jpg
Binary file not shown.
Binary file removed data/images/stitched/stop.jpg
Binary file not shown.
Binary file removed data/images/stitched/turn.jpg
Binary file not shown.
78 changes: 33 additions & 45 deletions scripts/python/braking_calibration.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
import can
import logging
import time

from os import system
from typing import Any

from src.driving.can import CANController
from src.driving.gamepad import (
EventType,
Gamepad,
GamepadAxis,
GamepadButton
)
from src.driving.can import CANController, get_can_bus
from src.driving.gamepad import EventType, Gamepad, GamepadAxis, GamepadButton
from src.driving.modes import ManualDriving

config = {
"braking_force_min": 0,
"braking_force_max": 100,
"braking_force_step": 5,
}

config = {"braking_force_min": 0, "braking_force_max": 100, "braking_force_step": 5}


class BrakeCalibrationProcedure:
"""Procedure to calibrate the braking force.
this procedure will calibrate the braking force. it will do this by
applying the brakes and checking if the wheels lock up. if they do we
will lower the amount of braking force. if they don't we will increase
the amount of braking force. this will be done using a binary search
This procedure will calibrate the braking force. It will do this by
applying the brakes and checking if the wheels lock up. If they do we
will lower the amount of braking force. If they don't we will increase
the amount of braking force. This will be done using a binary search
algorithm.
Attributes
Expand Down Expand Up @@ -56,28 +49,24 @@ def __init__(self, can_controller: CANController, gamepad: Gamepad) -> None:
# register the buttons
self.gamepad.add_listener(GamepadButton.RB, EventType.BUTTON_DOWN, self.start_procedure)
self.braking_steps = list(
range(
config["braking_force_min"],
config["braking_force_max"] + 1,
config["braking_force_step"],
)
range(config["braking_force_min"], config["braking_force_max"] + 1, config["braking_force_step"])
)

self.low_idx = 0
self.high_idx = len(self.braking_steps) - 1
self.middle_idx = self.low_idx + (self.high_idx - self.low_idx) // 2
self.logger = logging.getLogger(__name__)

def start_procedure(self, *args, **kwargs) -> None:
def start_procedure(self, *_args: Any, **_kwargs: Any) -> None:
"""Start the brake calibration procedure.
this will register all needed callbacks. next to this it often won't do
anything else. this function should be called when the B button is pressed.
This will register all needed callbacks. Next to this it often won't do
anything else. This function should be called when the B button is pressed.
"""
if self.started:
self.logger.warning("brake calibration already started")
logging.warning("brake calibration already started")
return

self.logger.info("start brake calibration")
logging.info("start brake calibration")
self.gamepad.add_listener(GamepadAxis.DPAD_X, EventType.AXIS_CHANGED, self.__arrow_pressed)
self.gamepad.add_listener(GamepadAxis.DPAD_Y, EventType.AXIS_CHANGED, self.__arrow_pressed)
self.gamepad.add_listener(GamepadButton.LB, EventType.BUTTON_DOWN, self.__start_braking)
Expand All @@ -96,36 +85,37 @@ def __arrow_pressed(self, button: GamepadAxis, _event: EventType, val: float) ->

if button == GamepadAxis.DPAD_Y:
if val == 1:
self.logger.info("select lockup")
logging.info("select lockup")
self.locked = True
elif val == -1:
self.logger.info("select non lockup")
logging.info("select non lockup")
self.locked = False
if button == GamepadAxis.DPAD_X:
self.logger.info("confirm")
logging.info("confirm")
if val == 1:
self.__confirm_lockup()

def __confirm_lockup(self) -> None:
"""Event listener to confirm the lockup.
if this function is called we will adjust the range of braking force
according to binary sort. if we locked up we will lower the amount.
if we didn't lock up we will increase the amount.
If this function is called we will adjust the range of braking force
according to binary sort. If we locked up we will lower the amount.
If we didn't lock up we will increase the amount.
"""
# check if we have braked.
if not self.braked:
self.logger.info("brake first")
logging.info("brake first")
return

self.braked = False

with open("braking_force.txt", "w") as f:
if self.low_idx >= self.high_idx:
f.write(f"braking force calibrated: {self.braking_steps[self.middle_idx]}")
self.logger.info("braking force calibrated: %d", self.braking_steps[self.middle_idx])
logging.info("braking force calibrated: %d", self.braking_steps[self.middle_idx])

f.write(f"low: {self.low_idx} high: {self.high_idx} middle: {self.middle_idx}")
self.logger.info("low: %d high: %d middle: %d", self.low_idx, self.high_idx, self.middle_idx)
logging.info("low: %d high: %d middle: %d", self.low_idx, self.high_idx, self.middle_idx)
if self.locked:
self.high_idx = self.middle_idx - 1
else:
Expand All @@ -135,22 +125,20 @@ def __confirm_lockup(self) -> None:
self.locked = False
self.middle_idx = self.low_idx + (self.high_idx - self.low_idx) // 2

def __start_braking(self, *args, **kwargs) -> None:
def __start_braking(self, *_args: Any, **_kwargs: Any) -> None:
"""Start braking at the set pressure."""
self.can_controller.set_brake(self.braking_steps[self.middle_idx])

def __stop_braking(self, *args, **kwargs) -> None:
def __stop_braking(self, *_args: Any, **_kwargs: Any) -> None:
"""Stop braking."""
self.can_controller.set_brake(0)
self.braked = True


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
system("ip link set can0 type can bitrate 500000")
system("ip link set can0 up")
logging.basicConfig(level=logging.INFO)

can_bus = can.interface.Bus(interface="socketcan", channel="can0", bitrate=500000)
can_bus = get_can_bus()
can_controller = CANController(can_bus)
can_controller.start()

Expand All @@ -163,4 +151,4 @@ def __stop_braking(self, *args, **kwargs) -> None:
brake_calibration = BrakeCalibrationProcedure(can_controller, gamepad)

while True:
pass
time.sleep(1)
6 changes: 3 additions & 3 deletions scripts/python/calibrate_cameras.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def calibrate_cameras() -> None:
right_image = cam_right.next()

# Stop the camera streams.
cam_left.stop()
cam_center.stop()
cam_right.stop()
cam_left.stop(detach=True)
cam_center.stop(detach=True)
cam_right.stop(detach=True)

calibrate_images([left_image, center_image, right_image])

Expand Down
94 changes: 94 additions & 0 deletions scripts/python/can_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import can
import logging
import threading
import time

from datetime import datetime
from pathlib import Path
from typing import Any, Callable

from src.constants import CANControlIdentifier
from src.driving.can import CANController, get_can_bus
from src.driving.gamepad import EventType, Gamepad, GamepadButton
from src.driving.modes import ManualDriving


class CANRecorder:
"""Procedure to record CAN message.
This procedure can be used to record messages send on the CAN bus.
This will record all the messages we can send and not the ones we receive.
"""

__can_bus: can.Bus
__recording: bool = False
__recorder: threading.Thread = None

def __init__(self) -> None:
"""Procedure to record CAN messages."""
self.__can_bus = get_can_bus()
self.__can_bus.set_filters(
[{"can_id": can_id, "can_mask": 0xFFF, "extended": False} for can_id in CANControlIdentifier]
)

def toggle_recording(self) -> None:
"""Toggle the recording of CAN messages.
This function will start recording CAN messages if it is not already recording,
and stop recording if it is already recording.
"""
if not self.__recording:
path = Path(f"./data/can_recordings/{datetime.now().strftime("%m_%d_%Y_%H_%M_%S")}.asc")
path.parent.mkdir(parents=True, exist_ok=True)

logging.info("Recording CAN messages to %s", path)
self.__recorder = threading.Thread(target=self.__recording_thread, args=(path,), daemon=True)
self.__recorder.start()
else:
self.__recording = False
self.__recorder.join(1)

self.__can_bus.shutdown()
self.__can_bus = None

logging.info("Stopped recording CAN messages")

def __recording_thread(self, filepath: Path) -> None:
"""Record CAN messages into a .asc file."""
self.__recording = True

with can.ASCWriter(filepath) as writer:
while self.__recording:
msg = self.__can_bus.recv(1)
if msg is not None:
writer.on_message_received(msg)


def create_toggle_callback(can_recorder: CANRecorder, gamepad: Gamepad) -> Callable[[Any, Any], None]:
"""Create the callback for the toggle of the CAN recording."""

def __toggle(*_args: Any, **_kwargs: Any) -> None:
can_recorder.toggle_recording()
gamepad.vibrate()

return __toggle


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

can_bus = get_can_bus()
can_controller = CANController(can_bus)
can_controller.start()

gamepad = Gamepad()
gamepad.start()

controller_driving = ManualDriving(gamepad, can_controller)
controller_driving.start()

can_recorder = CANRecorder()
gamepad.add_listener(GamepadButton.LB, EventType.LONG_PRESS, create_toggle_callback(can_recorder, gamepad))

while True:
time.sleep(1)
Loading

0 comments on commit 8160074

Please sign in to comment.