Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Device interface dev #118

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6b2915b
Implemented Device and AcquisitionMixin class. The latter can be used…
vasilisniaouris Sep 5, 2023
d276268
Added `utils.py` that implements helper methods for device management…
vasilisniaouris Sep 5, 2023
a015a91
In `AcquisitionMixin`, added logger access and modified pause-resume …
vasilisniaouris Sep 6, 2023
bddfe74
Added `logger.py` which helps get a preconfigured logger according to…
vasilisniaouris Sep 6, 2023
449db4e
Updated `logger_config.yaml` with a new formatter.
vasilisniaouris Sep 7, 2023
d6ca478
Updated `devices.py` and `utils.py`.
vasilisniaouris Sep 7, 2023
69c9bcd
Started working on `powermeters.py` to test the new devices classes.
vasilisniaouris Sep 7, 2023
b9627af
Minor updates for resource definition via search.
vasilisniaouris Sep 8, 2023
d1fd2be
Minor updates for resource definition via search in `MessageBasedDevi…
vasilisniaouris Sep 8, 2023
ba8d8a5
Added post-communication delay on message-based device class, to deal…
vasilisniaouris Sep 9, 2023
c9d981f
Moved acquistion-related definitions to `acquisition.py`. Defined Acq…
vasilisniaouris Sep 9, 2023
4785d27
Worked on furthering the definition of the Newport powermeter.
vasilisniaouris Sep 9, 2023
4897ead
Defined a LoggableMixin class to subclass so that we get class-specif…
vasilisniaouris Sep 9, 2023
07d381b
Added small comments in logger.py.
vasilisniaouris Sep 9, 2023
e61279e
`Device` in devices.py has its own logger now
vasilisniaouris Sep 9, 2023
9f845c0
Further developed AcquisitionThread, StreamingThread, and Acquisition…
vasilisniaouris Sep 9, 2023
254b5db
Updated acquisition.py classes to use LoggableMixin instead of indivi…
vasilisniaouris Sep 9, 2023
7329112
Updated logger.py documentation.
vasilisniaouris Sep 9, 2023
ee68940
Updated `TimeSeriesData` class.
vasilisniaouris Sep 10, 2023
9b8f8eb
Updated lock use in acquisition.py.
vasilisniaouris Sep 10, 2023
a21d6d7
Added a random value sensor as an example of Device and AcquisitionMi…
vasilisniaouris Sep 13, 2023
348d7f6
Minute changes related to mixin class issues. Need to work more on it.
vasilisniaouris Sep 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
722 changes: 722 additions & 0 deletions src/qt3utils/devices/acquisition.py

Large diffs are not rendered by default.

251 changes: 251 additions & 0 deletions src/qt3utils/devices/devices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import abc
import logging
import time
from typing import final, Type

from pyvisa import ResourceManager
from pyvisa.attributes import Attribute
from pyvisa.resources import MessageBasedResource

import threading

import queue

from src.qt3utils.devices.utils import force_clear_message_based_resource, MessageBasedResourceType, ResourceType, \
find_available_resources_by_idn, find_available_resources_by_visa_attribute
from src.qt3utils.logger import get_configured_logger, LoggableMixin

# TODO: Must have only one pyvisa resource manager, resources are bound to said manager.

resource_manager = ResourceManager()
logger = get_configured_logger(__name__) # TODO: lear more about logging applications from pyvisa logging.


def _str_is_float(string: str) -> bool:
try:
float(string)
return True
except ValueError:
return False


def _convert_str(string: str) -> int | float | str:
if string.isdigit():
return int(string)
elif _str_is_float(string):
return float(string)
else:
return string


class Device(abc.ABC, LoggableMixin):
"""
Interface class for all device implementations.
"""

DEVICE_PY_ALIAS: str
""" Device alias for logging purposes."""

# TODO: add loaded dll, ni daq task etc for mediator?
# it can also be an arbitrary class defined by the manufacturer or user?
# TODO: Figure out which information I want to retain about each device and the acquisition process itself, so that
# I can save it in the data. I would prefer using a Dataclass for data saving.

def __init__(self, mediator: ResourceType | None = None):
super(LoggableMixin).__init__(logger=get_configured_logger(self.DEVICE_PY_ALIAS))
self._lock = threading.Lock()
self.mediator = mediator
# TODO: Add a device lock, and use it as a decorator every time you access the hardware.
# Each function that calls the hardware needs to wait for the lock to be ready.
# This might be inefficient in the case of the time series. For the time-series we only need to do it before
# and after thread initialization and completion?

@abc.abstractmethod
def connect(self):
pass

@abc.abstractmethod
def disconnect(self):
pass

@abc.abstractmethod
def is_connected(self):
pass

@abc.abstractmethod
def clear(self):
pass

# @abc.abstractmethod
# def send_command(self, command):
# pass
#
# @abc.abstractmethod
# def receive_data(self):
# pass
#
# @abc.abstractmethod
# def get_device_info(self):
# pass
#
# @abc.abstractmethod
# def get_device_name(self):
# pass
#
# @abc.abstractmethod
# def get_device_type(self):
# pass
#
# @abc.abstractmethod
# def get_device_id(self):
# pass

def __del__(self):
self.disconnect()


class MessageBasedDevice(Device, abc.ABC):

DEFAULT_WRITE_TERMINATION = r'\r\n'
DEFAULT_READ_TERMINATION = r'\n'

DEFAULT_QUERY_DELAY = 10 ** -9 # s, even the smallest delay will help your device-read from crushing on you.
DEFAULT_POST_COMMUNICATION_DELAY = 10 ** -9 # same communication issue when device is not ready to move forward.

def __init__(
self,
mediator: MessageBasedResourceType,
post_communication_delay: float = DEFAULT_POST_COMMUNICATION_DELAY):
super(MessageBasedDevice).__init__(mediator)
super(Device).__init__(mediator)

self._post_communication_delay = post_communication_delay

def connect(self):
with self._lock:
self.mediator.open()
self.clear()

def disconnect(self):
self.clear()
with self._lock:
self.mediator.before_close()
self.mediator.close()

def is_connected(self):
# Accessing the session property itself will raise an InvalidSession exception if the session is not open.
with self._lock:
return self.mediator._session is not None

def clear(self, force: bool = False):
with self._lock:
self.clear()
time.sleep(self.DEFAULT_POST_COMMUNICATION_DELAY)

if force:
self.safe_write('*CLS')
force_clear_message_based_resource(self.mediator, lock=self._lock)

def safe_query(self, message: str, delay: float | None = None) -> str:
with self._lock:
response = self.mediator.query(message, delay)
time.sleep(self.DEFAULT_POST_COMMUNICATION_DELAY)
return response

def safe_write(self, message: str, termination: str | None = None, encoding: str | None = None):
with self._lock:
self.mediator.write(message, termination, encoding)
time.sleep(self.DEFAULT_POST_COMMUNICATION_DELAY)

def safe_read(self, termination: str | None = None, encoding: str | None = None) -> str:
with self._lock:
response = self.mediator.read(termination, encoding)
time.sleep(self.DEFAULT_POST_COMMUNICATION_DELAY)
return response

@property
def post_communication_delay(self):
return self._post_communication_delay

@post_communication_delay.setter
def post_communication_delay(self, value: float):
self._post_communication_delay = value

@staticmethod
def parse_response(response: str) -> list[int | float | str] | int | float | str:
response = response.strip()
response_list: list[str] = response.split(',')
response_list = [_convert_str(r.strip()) for r in response_list]

return response_list if len(response_list) > 1 else response_list[0]

@staticmethod
def _set_rm_kwargs_defaults(method):
def wrapper(cls, **rm_kwargs):
rm_kwargs.setdefault('write_termination', cls.DEFAULT_WRITE_TERMINATION)
rm_kwargs.setdefault('read_termination', cls.DEFAULT_READ_TERMINATION)
rm_kwargs.setdefault('query_delay', cls.DEFAULT_QUERY_DELAY)
return method(cls, **rm_kwargs)
return wrapper

@classmethod
@_set_rm_kwargs_defaults
def from_resource_name(
cls,
resource_name: str,
post_communication_delay: float = DEFAULT_POST_COMMUNICATION_DELAY,
**rm_kwargs,
) -> 'MessageBasedDevice':

resource = resource_manager.open_resource(resource_name, **rm_kwargs)

if not isinstance(resource, MessageBasedResource):
# TODO: Change message
raise ValueError(f'Resource {resource} with resource_name {resource_name} is not a MessageBasedResource.')

return cls(resource, post_communication_delay)

@classmethod
@_set_rm_kwargs_defaults
def from_visa_attribute(
cls,
visa_attribute: Type[Attribute],
desired_attr_value: str,
is_partial=False,
post_communication_delay: float = DEFAULT_POST_COMMUNICATION_DELAY,
**rm_kwargs,
) -> 'MessageBasedDevice':

resource_list = find_available_resources_by_visa_attribute(
resource_manager, visa_attribute, desired_attr_value, is_partial, **rm_kwargs)

if len(resource_list) == 0:
raise ValueError(f'No resource found with visa_attribute {visa_attribute}.') # TODO: Change message
elif len(resource_list) > 1:
raise ValueError(f'Multiple resources found with visa_attribute {visa_attribute}.') # TODO: Change message

resource = resource_list[0]
if not isinstance(resource, MessageBasedResource):
# TODO: Change message
raise ValueError(f'Resource {resource} is not a MessageBasedResource.')

return cls(resource, post_communication_delay)

@classmethod
@_set_rm_kwargs_defaults
def from_idn(
cls,
idn: str,
is_partial: bool = False,
post_communication_delay: float = DEFAULT_POST_COMMUNICATION_DELAY,
**rm_kwargs,
) -> 'MessageBasedDevice':

resource_list = find_available_resources_by_idn(resource_manager, idn, is_partial, **rm_kwargs)

if len(resource_list) == 0:
raise ValueError(f'No resource found with idn {idn}.') # TODO: Change message
elif len(resource_list) > 1:
raise ValueError(f'Multiple resources found with idn {idn}.') # TODO: Change message

return cls(resource_list[0], post_communication_delay)
94 changes: 94 additions & 0 deletions src/qt3utils/devices/powermeters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import abc
from enum import Enum

from src.qt3utils.devices.devices import MessageBasedDevice, AcquisitionMixin
from src.qt3utils.devices.utils import MessageBasedResourceType


class MessageBasedOpticalPowerMeter(MessageBasedDevice, AcquisitionMixin, abc.ABC):

DEVICE_PY_ALIAS = 'Message-Based Optical Power Meter'

def __init__(
self,
mediator: MessageBasedResourceType,
post_communication_delay: float = MessageBasedDevice.DEFAULT_POST_COMMUNICATION_DELAY,
):
super(MessageBasedDevice).__init__(mediator, post_communication_delay)
super(AcquisitionMixin).__init__()

def disconnect(self):
self.stop_time_series_acquisition(force_stop=True)
super().disconnect() # Refers to Device-inherited class.


class Newport2835C(MessageBasedOpticalPowerMeter):
# TODO: check manual chapter 2.4.7 for DC sampling settings!

DEFAULT_QUERY_DELAY = 10 ** -4 # 0.02 s instead? worked best with dummy testing
DEFAULT_POST_COMMUNICATION_DELAY = 10 ** -4 # TODO: test and see what works best

DEVICE_PY_ALIAS = 'Newport 2835-C'
DEFAULT_IDN_PART = 'NewportCorp,2835-C'

MINIMUM_ACQ_SLEEP_TIME = 0.002 # s

CHANNELS = ['A', 'B', ''] # Empty string means both channels.
CHANNEL_TO_CHANNEL_BASED_COMMAND = {'A': '_A', 'B': '_B', '': ''}

READ_POWER_BASE_COMMAND = 'R'

class SamplePrecisionFrequencyLimits:
MIN_20000_CNT = 0.001 # Hz
MAX_20000_CNT = 25 # Hz

# Use when channel connections are not known.
# See more on device manual, chapter 2.4.7 on DC sampling settings.
MIN_4096_CNT = 0.001 # Hz
MAX_4096_CNT = 500 # Hz

MIN_4096_CNT_SINGLE = 0.001 # Hz
MAX_4096_CNT_SINGLE = 1000 # Hz

MIN_4096_CNT_DUAL = 0.001 # Hz
MAX_4096_CNT_DUAL = 500 # Hz

class SamplePrecisionStates(Enum):
HighAccuracy = 20000 # counts
LowAccuracy = 4096 # counts

# define sample precision freq and state input standards

def __new__(cls, *args, **kwargs): # Default creation via default DEFAULT_IDN_PART
return cls.from_idn(cls.DEFAULT_IDN_PART, True, *args, **kwargs)

def __init__(
self, mediator: MessageBasedResourceType,
post_communication_delay: float = DEFAULT_POST_COMMUNICATION_DELAY
):
super().__init__(mediator, post_communication_delay)
self.channel: str = ''

def setup_acquisition(
self,
acquisition_sleep_time: float = None,
acquisition_slow_sleep_time: float = None,
channel: list[str] = None,
units: str = None,
sample_precision_frequency: float = None,
sample_precision_accuracy: int = None,
auto_range: bool = None,
wavelength: float = None,
):
super().setup_acquisition(acquisition_sleep_time, acquisition_slow_sleep_time)
if channel is not None:
if channel not in self.CHANNELS:
raise ValueError(f'Invalid channel: {channel}') # TODO: change
self.channel = channel # check with available channels "CH?"

def single_acquisition(self) -> list:
pass

def get_channel_based_command(self, command_str: str) -> str:
return command_str + self.CHANNEL_TO_CHANNEL_BASED_COMMAND[self.channel]

Loading