Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 87 additions & 184 deletions bpod_core/bpod.py → bpod_core/bpod/__init__.py

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions bpod_core/bpod/abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Abstract base classes used by the bpod module."""

from abc import ABC, abstractmethod

from bpod_core.bpod.structs import HardwareConfiguration, VersionInfo
from bpod_core.misc import DocstringInheritanceMixin


class AbstractBpod(DocstringInheritanceMixin, ABC):
"""Abstract base class for Bpod objects."""

_version: VersionInfo
_hardware: HardwareConfiguration
_serial_number: str

@property
@abstractmethod
def name(self) -> str | None:
"""The Bpod's user-defined name, or :obj:`None` if not set."""

@property
@abstractmethod
def location(self) -> str | None:
"""The Bpod's user-defined location, or :obj:`None` if not set."""

@property
def version(self) -> VersionInfo:
"""Version information of the Bpod's firmware and hardware."""
return self._version

@property
def serial_number(self) -> str:
"""The Bpod's unique serial number."""
return self._serial_number

@abstractmethod
def set_status_led(self, enabled: bool) -> bool:
"""
Enable or disable the Bpod's status LED.

Parameters
----------
enabled : bool
True to enable the status LED, False to disable.

Returns
-------
bool
True if the operation was successful, False otherwise.
"""
36 changes: 36 additions & 0 deletions bpod_core/bpod/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Constants used by the bpod module."""

from bpod_core.constants import PLATFORMDIRS, VID_TEENSY, TeensyPID

VIDS_BPOD = [VID_TEENSY]
"""Vendor IDs of supported Bpod devices"""

PIDS_BPOD = [TeensyPID.SERIAL, TeensyPID.DUAL_SERIAL, TeensyPID.TRIPLE_SERIAL]
"""List of Product IDs of supported Bpod devices"""

MIN_BPOD_FW_VERSION = (23, 0)
"""minimum supported firmware version (major, minor)"""

MIN_BPOD_HW_VERSION = 3
"""minimum supported hardware version"""

MAX_BPOD_HW_VERSION = 4
"""maximum supported hardware version"""

CHANNEL_TYPES_INPUT = {
b'U': 'Serial',
b'X': 'SoftCode',
b'Z': 'SoftCodeApp',
b'F': 'Flex',
b'D': 'Digital',
b'B': 'BNC',
b'W': 'Wire',
b'P': 'Port',
}
CHANNEL_TYPES_OUTPUT = CHANNEL_TYPES_INPUT.copy()
CHANNEL_TYPES_OUTPUT.update({b'V': 'Valve', b'P': 'PWM'})
N_SERIAL_EVENTS_DEFAULT = 15
VALID_OPERATORS = {'exit', '>exit', '>back'}
MACHINE_TYPES = {3: 'r2.0-2.5', 4: '2+ r1.0'}
CONFIG_PATH = PLATFORMDIRS.user_config_path
DISCOVERY_TIMEOUT = 0.11
81 changes: 81 additions & 0 deletions bpod_core/bpod/structs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Data structures used by the bpod module."""

import msgspec


class BpodSettings(msgspec.Struct):
"""Settings for a specific Bpod device."""

serial_number: str
"""Serial number of the device."""
name: str = ''
"""User-defined name of the device."""
location: str = ''
"""User-defined location of the device."""
zmq_port_pub: int | None = None
"""Port number for the ZeroMQ PUB service."""
zmq_port_rep: int | None = None
"""Port number for the ZeroMQ REP service."""


class BpodInfo(msgspec.Struct):
"""Information about a specific Bpod device."""

serial_number: str
"""Serial number of the device."""
port: str | None = None
"""Port on which the device is connected."""
name: str = ''
"""User-defined name of the device."""
location: str = ''
"""User-defined location of the device."""
zmq_pub: str | None = None
"""ZeroMQ PUB service address."""
zmq_rep: str | None = None
"""ZeroMQ REP service address."""


class VersionInfo(msgspec.Struct, frozen=True):
"""Data structure representing various version information."""

firmware: tuple[int, int]
"""Firmware version (major, minor)"""
machine: int
"""Machine type (numerical)"""
machine_str: str
"""Machine type (string)"""
pcb: int | None
"""PCB revision, if applicable"""
bpod_core: str
"""bpod-core version"""


class HardwareConfiguration(msgspec.Struct, frozen=True):
"""Represents the Bpod's on-board hardware configuration."""

max_states: int
"""Maximum number of supported states in a single state machine description."""
cycle_period: int
"""Period of the state machine's refresh cycle during a trial in microseconds."""
max_serial_events: int
"""Maximum number of behavior events allocatable among connected modules."""
max_bytes_per_serial_message: int
"""Maximum number of bytes allowed per serial message."""
n_global_timers: int
"""Number of global timers supported."""
n_global_counters: int
"""Number of global counters supported."""
n_conditions: int
"""Number of condition-events supported."""
n_inputs: int
"""Number of input channels."""
input_description: bytes
"""Array indicating the state machine's onboard input channel types."""
n_outputs: int
"""Number of channels in the state machine's output channel description array."""
output_description: bytes
"""Array indicating the state machine's onboard output channel types."""
cycle_frequency: int
"""Frequency of the state machine's refresh cycle during a trial in Hertz."""
n_modules: int
"""Number of modules supported by the state machine."""
64 changes: 35 additions & 29 deletions bpod_core/com.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import logging
import re
import struct
import weakref
from collections.abc import Callable, Sequence
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Any, cast

Expand Down Expand Up @@ -412,20 +414,38 @@ def verify_serial_discovery(
return False


class USBSerialDevice:
def _close_serial_connection(serial: Serial, raise_errors: bool = False) -> None:
"""Close a serial connection if open."""
if not getattr(serial, 'is_open', False):
return
logger.debug('Closing connection to serial device on %s', serial.port)
try:
serial.close()
except Exception as e:
if not raise_errors:
return
raise SerialException(
f'Failed to close connection to serial device on {serial.port}'
) from e


class SerialDevice(AbstractContextManager):
"""Class that interfaces with a USB serial device."""

_serial: ExtendedSerial
"""The serial connection to the USB device."""
"""The serial connection to the device."""

_port_info: ListPortInfo
"""Information about the serial port associated with the device."""

_device_type: str = 'serial device'
"""The type of the USB device, e.g., 'Bpod'."""

def __init__(self, port: str, open_connection: bool = True, **kwargs: Any) -> None:
"""Initialize the USB serial device.
def __init__(
self,
port: str,
open_connection: bool = True,
serial_device_name: str = 'serial_device',
**kwargs: Any,
) -> None:
"""Initialize the serial device.

Parameters
----------
Expand All @@ -445,21 +465,13 @@ def __init__(self, port: str, open_connection: bool = True, **kwargs: Any) -> No
self._port_info = next(p for p in comports() if p.device == port)
except StopIteration as e:
raise SerialException(f'Serial port not found: {port}') from e
self._serial_device_name = serial_device_name
self._serial = ExtendedSerial()
weakref.finalize(self, _close_serial_connection, self._serial)
self._serial.port = port
if open_connection:
self.open()

def __enter__(self) -> Self:
"""Enter the context manager.

Returns
-------
Self
The device instance.
"""
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
Expand All @@ -479,7 +491,8 @@ def __exit__(
exc_tb : TracebackType | None
The traceback object, if any.
"""
self.close()
if hasattr(self, '_serial'):
_close_serial_connection(self._serial)

def open(self) -> None:
"""Open the serial connection.
Expand All @@ -493,12 +506,12 @@ def open(self) -> None:
"""
if self._serial.is_open:
return
logger.debug('Opening connection to %s on %s', self._device_type, self.port)
logger.debug('Opening connection to serial device on %s', self.port)
try:
self._serial.open()
except Exception as e:
raise SerialException(
f'Failed to open connection to {self._device_type} on {self.port}'
f'Failed to open connection to serial device on {self.port}'
) from e

def close(self) -> None:
Expand All @@ -511,15 +524,8 @@ def close(self) -> None:
serial.SerialException
If the connection cannot be closed.
"""
if not self._serial.is_open:
return
logger.debug('Closing connection to %s on %s', self._device_type, self.port)
try:
self._serial.close()
except Exception as e:
raise SerialException(
f'Failed to close connection to {self._device_type} on {self.port}'
) from e
if hasattr(self, '_serial'):
_close_serial_connection(self._serial, raise_errors=True)

@property
def port(self) -> str:
Expand Down
13 changes: 9 additions & 4 deletions bpod_core/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Constants and identifiers used throughout the package."""

from enum import IntEnum
from struct import Struct

import platformdirs

# pre-compiled structs for common data types
STRUCT_BOOL = Struct('?')
"""Compiled struct representing a boolean value."""
Expand Down Expand Up @@ -30,13 +33,15 @@
VID_TEENSY: int = 0x16C0
"""Vendor ID of Teensy microcontrollers."""

PLATFORMDIRS = platformdirs.PlatformDirs(appname='bpod-core', appauthor=False)


class PIDsTeensy:
class TeensyPID(IntEnum):
"""Product IDs of Teensy microcontrollers."""

SERIAL: int = 0x0483
SERIAL = 0x0483
"""Product ID of Teensy microcontrollers with single USB serial port."""
DUAL_SERIAL: int = 0x048B
DUAL_SERIAL = 0x048B
"""Product ID of Teensy microcontrollers with dual USB serial ports."""
TRIPLE_SERIAL: int = 0x048C
TRIPLE_SERIAL = 0x048C
"""Product ID of Teensy microcontrollers with triple USB serial ports."""
Loading