diff --git a/LICENSE b/LICENSE index 02cca62..26457b7 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2018, Pierre Clisson +Copyright (c) 2024, Pierre Clisson Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.rst b/README.rst index 5e8667b..05ec20d 100644 --- a/README.rst +++ b/README.rst @@ -1,8 +1,7 @@ -An example Timeflux plugin -========================== +OctaEEG driver for Timeflux +=========================== -This is an example plugin that provides a few simple demonstration nodes. Use it as a template -to develop your own plugins. +This plugin provides a driver to connect to the OctaEEG board Installation ------------ @@ -14,4 +13,4 @@ You can then install this plugin in the `timeflux` environment: :: $ conda activate timeflux - $ pip install timeflux_example + $ pip install git+https://github.com/timeflux/timeflux_octaeeg.git diff --git a/examples/dynamic.yaml b/examples/dynamic.yaml deleted file mode 100644 index bc67834..0000000 --- a/examples/dynamic.yaml +++ /dev/null @@ -1,27 +0,0 @@ -graphs: - - - id: DynamicIO - - nodes: - - id: node_1 - module: timeflux_example.nodes.dynamic - class: Outputs - params: - seed: 1 - - id: node_2 - module: timeflux_example.nodes.dynamic - class: Inputs - - id: node_3 - module: timeflux.nodes.debug - class: Display - - id: node_4 - module: timeflux.nodes.debug - class: Display - - edges: - - source: node_1:* # The magic happens here - target: node_2 - - source: node_1 - target: node_3 - - source: node_2 - target: node_4 \ No newline at end of file diff --git a/examples/dynamic_prefixed.yaml b/examples/dynamic_prefixed.yaml deleted file mode 100644 index c301ce3..0000000 --- a/examples/dynamic_prefixed.yaml +++ /dev/null @@ -1,30 +0,0 @@ -graphs: - - - id: DynamicIO - - nodes: - - id: node_1 - module: timeflux_example.nodes.dynamic - class: Outputs - params: - seed: 1 - prefix: foo - - id: node_2 - module: timeflux_example.nodes.dynamic - class: Inputs - params: - prefix: bar - - id: node_3 - module: timeflux.nodes.debug - class: Display - - id: node_4 - module: timeflux.nodes.debug - class: Display - - edges: - - source: node_1:foo_* # Dynamic inputs can be prefixed - target: node_2:bar # The same goes for outputs - - source: node_1 - target: node_3 - - source: node_2 - target: node_4 \ No newline at end of file diff --git a/examples/multi.yaml b/examples/multi.yaml deleted file mode 100644 index 4305ba3..0000000 --- a/examples/multi.yaml +++ /dev/null @@ -1,50 +0,0 @@ -graphs: - - - id: multi - nodes: - - id: matrix_1 - module: timeflux.nodes.random - class: Random - params: - columns: 2 - rows_min: 2 - rows_max: 2 - value_min: 1 - value_max: 1 - seed: 1 - - id: matrix_2 - module: timeflux.nodes.random - class: Random - params: - columns: 2 - rows_min: 2 - rows_max: 2 - value_min: 2 - value_max: 2 - seed: 1 - - id: matrix_add - module: timeflux_example.nodes.arithmetic - class: MatrixAdd - - id: display_matrix_1 - module: timeflux.nodes.debug - class: Display - - id: display_matrix_2 - module: timeflux.nodes.debug - class: Display - - id: display_matrix_add - module: timeflux.nodes.debug - class: Display - - edges: - - source: matrix_1 - target: matrix_add:m1 - - source: matrix_2 - target: matrix_add:m2 - - source: matrix_1 - target: display_matrix_1 - - source: matrix_2 - target: display_matrix_2 - - source: matrix_add - target: display_matrix_add - - rate: 0.1 \ No newline at end of file diff --git a/examples/sine.yaml b/examples/sine.yaml deleted file mode 100644 index 4873c93..0000000 --- a/examples/sine.yaml +++ /dev/null @@ -1,22 +0,0 @@ -graphs: - - nodes: - - id: sine - module: timeflux_example.nodes.signal - class: Sine - params: - frequency: 120 - amplitude: 1 - resolution: 44100 - - id: ui - module: timeflux_ui.nodes.ui - class: UI - params: - settings: - monitor: - millisPerPixel: 0.25 - lineWidth: 1 - interpolation: linear - edges: - - source: sine - target: ui:sine - rate: 10 \ No newline at end of file diff --git a/examples/sinus.yaml b/examples/sinus.yaml deleted file mode 100644 index 344bd0c..0000000 --- a/examples/sinus.yaml +++ /dev/null @@ -1,19 +0,0 @@ -graphs: - - - nodes: - - id: sinus - module: timeflux_example.nodes.sinus - class: Sinus - params: - rate: 1 - amplitude: 1 - - - id: ui - module: timeflux_ui.nodes.ui - class: UI - - edges: - - source: sinus - target: ui:sinus - - rate: 100 \ No newline at end of file diff --git a/examples/test.yaml b/examples/test.yaml index 696612d..59af1ba 100644 --- a/examples/test.yaml +++ b/examples/test.yaml @@ -1,27 +1,17 @@ graphs: - - - nodes: - - id: node_1 - module: timeflux.nodes.random - class: Random - params: - columns: 5 - rows_min: 1 - rows_max: 10 - value_min: 0 - value_max: 5 - seed: 1 - - id: node_2 - module: timeflux_example.nodes.arithmetic - class: Add - params: - value: 1 - - id: node_3 - module: timeflux.nodes.debug - class: Display - + - id: EEG + nodes: + - id: EEG + module: timeflux_octaeeg.nodes.driver + class: OctaEEG + params: + rate: 1000 + gain: 1 + names: [PO7, O1, Oz, O2, PO8, PO3, POz, PO4] + - id: Display + module: timeflux.nodes.debug + class: Display edges: - - source: node_1 - target: node_2 - - source: node_2 - target: node_3 \ No newline at end of file + - source: EEG + target: Display + rate: 1 diff --git a/setup.cfg b/setup.cfg index 82bc236..88905fe 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,15 +1,15 @@ [metadata] -name = timeflux-example -description = An example Timeflux plugin +name = timeflux-octaeeg +description = Driver for OctaEEG long_description = file: README.rst author = Pierre Clisson author-email = contact@timeflux.io license = MIT home-page = https://timeflux.io project_urls = - Documentation = http://doc.timeflux.io/projects/timeflux-example/ - Source Code = https://github.com/timeflux/timeflux_example - Bug Tracker = https://github.com/timeflux/timeflux_example/issues + Documentation = http://doc.timeflux.io/projects/timeflux-octaeeg/ + Source Code = https://github.com/timeflux/timeflux_octaeeg + Bug Tracker = https://github.com/timeflux/timeflux_octaeeg/issues classifier = Development Status :: 4 - Beta Environment :: Console @@ -23,7 +23,8 @@ keywords = timeflux [options] packages = find: install_requires = - timeflux>=0.5.3 + timeflux>=0.17.1 + websocket_client>=1.8.0 [options.extras_require] dev = @@ -34,9 +35,9 @@ dev = docinit [docinit] -name = Example plugin +name = Timeflux OctaEEG parent_url = https://doc.timeflux.io logo_url = https://github.com/timeflux/timeflux/raw/master/doc/static/img/logo.png [build_sphinx] -warning-is-error = 1 \ No newline at end of file +warning-is-error = 1 diff --git a/test/test_arithmetic.py b/test/test_arithmetic.py deleted file mode 100644 index ff6041d..0000000 --- a/test/test_arithmetic.py +++ /dev/null @@ -1,25 +0,0 @@ -"""Tests for arithmetic.py""" - -import pytest -import pandas as pd -from pandas.testing import assert_frame_equal -from timeflux.core.io import Port -from timeflux_example.nodes.arithmetic import Add, MatrixAdd - -def test_add(): - node = Add(1) - node.i = Port() - node.i.data = pd.DataFrame([[1, 1], [1, 1]]) - node.update() - expected = pd.DataFrame([[2, 2], [2, 2]]) - assert_frame_equal(node.o.data, expected) - -def test_matrix(): - node = MatrixAdd() - node.i_m1 = Port() - node.i_m2 = Port() - node.i_m1.data = pd.DataFrame([[1, 1], [1, 1]]) - node.i_m2.data = pd.DataFrame([[2, 2], [2, 2]]) - node.update() - expected = pd.DataFrame([[3, 3], [3, 3]]) - assert_frame_equal(node.o.data, expected) diff --git a/timeflux_example/nodes/arithmetic.py b/timeflux_example/nodes/arithmetic.py deleted file mode 100644 index 2351884..0000000 --- a/timeflux_example/nodes/arithmetic.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Simple example nodes""" - -from timeflux.core.node import Node - - -class Add(Node): - - """Adds ``value`` to each cell of the input. - - This is one of the simplest possible nodes. - - Attributes: - i (Port): Default input, expects DataFrame. - o (Port): Default output, provides DataFrame. - - Example: - .. literalinclude:: /../examples/test.yaml - :language: yaml - """ - - def __init__(self, value): - """ - Args: - value (int): The value to add to each cell. - """ - self._value = value - - def update(self): - # Make sure we have a non-empty dataframe - if self.i.ready(): - # Copy the input to the output - self.o = self.i - # Add the value to each cell - self.o.data += self._value - - -class MatrixAdd(Node): - - """Sum two input matrices together. - - This node illustrates multiple named inputs. - Note that it is not necessary to declare the ports. They will be created dynamically. - - Attributes: - i_m1 (Port): First matrix, expects DataFrame. - i_m2 (Port): Second matrix, expects DataFrame. - o (Port): Default output, provides DataFrame. - - Example: - .. literalinclude:: /../examples/multi.yaml - :language: yaml - """ - - def __init__(self): - pass - - def update(self): - # propagate the meta - self.o.meta = self.i_m1.meta - self.o.meta.update(self.i_m2.meta) - # sum the data - self.o.data = self.i_m1.data + self.i_m2.data - - -class MatrixDivide(Node): - - """Divide one matrix by another. - - Attributes: - i_m1 (Port): First matrix, expects DataFrame. - i_m2 (Port): Second matrix, expects DataFrame. - o (Port): Default output, provides DataFrame. - - """ - - def __init__(self): - pass - - def update(self): - self.o.data = self.i_m1.data.divide(self.i_m2.data) diff --git a/timeflux_example/nodes/dynamic.py b/timeflux_example/nodes/dynamic.py deleted file mode 100644 index 6148d2e..0000000 --- a/timeflux_example/nodes/dynamic.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Illustrates dynamic inputs and outputs.""" - -import random -from timeflux.core.node import Node - - -class Outputs(Node): - - """Randomly generate dynamic outputs. - - At each update, this node generates a random number of outputs and sets the default output - to the number it has created. - - Attributes: - o (Port): Default output, provides DataFrame. - o_* (Port): Dynamic outputs. - - Args: - seed (int): The random number generator seed. - prefix (string): The prefix to add to each dynamic output. - - Example: - .. literalinclude:: /../examples/dynamic_prefixed.yaml - :language: yaml - """ - - def __init__(self, prefix=None, seed=None): - random.seed(seed) - self.prefix = "" if prefix is None else prefix + "_" - - def update(self): - # Lazily create new ports - for i in range(random.randint(0, 10)): - getattr(self, "o_" + self.prefix + str(i)) - # Count - outputs = len(list(self.iterate("o_*"))) - # Set default output - self.o.set([[outputs]], names=["outputs"]) - - -class Inputs(Node): - - """Count the dynamic outputs. - - At each update, this node loops over all dynamic inputs and sets the default output - to the number it has found. - - Attributes: - i_* (Port): Dynamic inputs. - o (Port): Default output, provides DataFrame. - - Args: - prefix (string): The prefix to add to match dynamic inputs. - - Example: - .. literalinclude:: /../examples/dynamic.yaml - :language: yaml - """ - - def __init__(self, prefix=None): - self.prefix = "" if prefix is None else prefix + "_" - - def update(self): - # Count - inputs = len(list(self.iterate("i_" + self.prefix + "*"))) - # Set default output - self.o.set([[inputs]], names=["inputs"]) diff --git a/timeflux_example/nodes/signal.py b/timeflux_example/nodes/signal.py deleted file mode 100644 index 5095f1f..0000000 --- a/timeflux_example/nodes/signal.py +++ /dev/null @@ -1,46 +0,0 @@ -"""timeflux_example.nodes.signal: generate signals""" - -import time -import numpy as np -from timeflux.core.node import Node - - -class Sine(Node): - """Generate a sinusoidal signal. - - Attributes: - o (Port): Default output, provides DataFrame. - - Args: - frequency (float): cycles per second. Default: ``1``. - resolution (int): points per second. Default: ``200``. - amplitude (float): signal amplitude. Default: ``1``. - name (string): signal name. Default: ``sine``. - - Example: - .. literalinclude:: /../examples/sine.yaml - :language: yaml - """ - - def __init__(self, frequency=1, resolution=200, amplitude=1, name="sine"): - self._frequency = frequency - self._resolution = int(resolution) - self._amplitude = amplitude - self._name = name - self._radian = 0 - self._now = time.time() - - def update(self): - now = time.time() - elapsed = now - self._now - points = int(elapsed * self._resolution) + 1 - cycles = self._frequency * elapsed - values = np.linspace(self._radian, np.pi * 2 * cycles + self._radian, points) - signal = np.sin(values[:-1]) * self._amplitude - timestamps = np.linspace( - int(self._now * 1e6), int(now * 1e6), points, False, dtype="datetime64[us]" - )[1:] - self._now = now - self._radian = values[-1] - self.o.set(signal, timestamps, names=[self._name]) - self.o.meta = {"rate": self._frequency} diff --git a/timeflux_example/nodes/sinus.py b/timeflux_example/nodes/sinus.py deleted file mode 100644 index 66bde20..0000000 --- a/timeflux_example/nodes/sinus.py +++ /dev/null @@ -1,44 +0,0 @@ -"""timeflux_example.nodes.sinus: generate sinusoidal signal""" - -import numpy as np -from timeflux.core.node import Node -from timeflux.helpers.clock import time_to_float, float_to_time, now -from timeflux.core.registry import Registry - - -class Sinus(Node): - """Return a sinusoidal signal sampled to registry rate. - - This node generates a sinusoidal signal of chosen frequency and amplitude. - Note that at each update, the node generate one row, so its sampling rate - equals the graph parsing rate (given by the Registry). - - Attributes: - o (Port): Default output, provides DataFrame. - - Example: - .. literalinclude:: /../examples/sinus.yaml - :language: yaml - - .. deprecated:: - Use :func:`timeflux_example.nodes.signal.Sine` instead. - - """ - - def __init__(self, amplitude=1, rate=1, name="sinus"): - self._amplitude = amplitude - self._rate = rate - self._name = name - self._start = None - - def update(self): - timestamp = now() - float = time_to_float(timestamp) - if self._start is None: - self._start = float - - values = [ - self._amplitude * np.sin(2 * np.pi * self._rate * (float - self._start)) - ] - self.o.set(values, names=[self._name]) - self.o.meta = {"rate": Registry.rate} diff --git a/timeflux_example/__init__.py b/timeflux_octaeeg/__init__.py similarity index 100% rename from timeflux_example/__init__.py rename to timeflux_octaeeg/__init__.py diff --git a/timeflux_example/nodes/__init__.py b/timeflux_octaeeg/nodes/__init__.py similarity index 100% rename from timeflux_example/nodes/__init__.py rename to timeflux_octaeeg/nodes/__init__.py diff --git a/timeflux_octaeeg/nodes/driver.py b/timeflux_octaeeg/nodes/driver.py new file mode 100644 index 0000000..01be075 --- /dev/null +++ b/timeflux_octaeeg/nodes/driver.py @@ -0,0 +1,161 @@ +"""OctaEEG Driver""" + +import websocket +import socket +import numpy as np +from timeflux.core.node import Node +from timeflux.core.exceptions import WorkerInterrupt +from timeflux.helpers.clock import now +from threading import Thread, Lock + + +RATES = { 250: 0x06, 500: 0x05, 1000: 0x04, 2000: 0x03, 4000: 0x02, 8000: 0x01, 16000: 0x00 } +GAINS = { 1: 0xC0, 2: 0xC1, 4:0xC2, 6: 0xC3, 8: 0xC4, 12: 0xC5, 24: 0xC6 } +CHANNELS = 8 + +class OctaEEG(Node): + + """OctaEEG Driver. + + Args: + port (string): The serial port. + e.g. ``COM3`` on Windows; ``/dev/cu.usbmodem14601`` on MacOS; + ``/dev/ttyUSB0`` on GNU/Linux. + rate (int): The device rate in Hz. + Allowed values: ``250``, ``500``, ``1024``, ``2048``, ``4096``, ``8192``, + ``16384``. Default: ``250``. + gain (int): The amplifier gain. + Allowed values: ``1``, ``2``, ``4``, ``6``, ``8``, ``12``, ``24``. + Default: ``24``. + names (int): The number of channels to enable. Default: ``8``. + + Attributes: + o (Port): Default output, provides DataFrame. + + Example: + .. literalinclude:: /../examples/test.yaml + :language: yaml + """ + + def __init__(self, rate=250, gain=1, names=None): + + # Validate input + if rate not in RATES: + raise ValueError( + f"`{rate}` is not a valid rate; valid rates are: {sorted(RATES.keys())}" + ) + if gain not in GAINS.keys(): + raise ValueError( + f"`{gain}` is not a valid gain; valid gains are: {sorted(GAINS.keys())}" + ) + self.rate = rate + self.gain = gain + + # Set channel names + if isinstance(names, list) and len(names) == CHANNELS: + self.names = names + else: + self.names = list(range(1, CHANNELS + 1)) + + # Connect + self._ws = websocket.WebSocket() + try: + self._ws.connect(f"ws://{socket.gethostbyname('oric.local')}:81") + self.logger.debug("Connected") + except: + raise WorkerInterrupt("Could not connect to device") + + # Set sampling rate and gain + self._set_sampling_rate(rate) + self._set_gain(gain) + + # Compute time offset + # TODO + + # Set meta + self.meta = { "rate": rate } + + # Launch background thread + self._reset() + self._lock = Lock() + self._running = True + self._thread = Thread(target=self._loop).start() + + + def _set_sampling_rate(self, rate, fps=1): + """Set sampling rate.""" + # TODO: configure FPS + if rate in RATES: + byte = RATES[rate] << 1 + command = 0x80 | byte | fps + self._ws.send_binary(command.to_bytes(1, byteorder="big")) + + + def _set_gain(self, gain): + """Set gain.""" + # TODO: set gain per channel + if gain in GAINS: + self._ws.send_binary(GAINS[gain].to_bytes(1, byteorder="big")) + + + def _reset(self): + """Empty cache.""" + self._rows = [] + self._timestamps = [] + + + def _loop(self): + """Acquire and cache data.""" + delta = np.timedelta64(np.int64(1e9 / self.rate), "ns") + while self._running: + #try: + timestamp, data = self._read() + timestamps = [timestamp] + for i in range(len(data) - 1): + timestamps.append(timestamps[-1] - delta) + timestamps.reverse() + if data: + self._lock.acquire() + self._timestamps += timestamps + self._rows += data + self._lock.release() + # except: + # pass + + + def _read(self): + """Receive packets from the device.""" + data = self._ws.recv() + timestamp = now() # TODO: adjust for latency + block_size = 32 + rows = [] + for block_index in range(0, len(data), block_size): + block = data[block_index:block_index + block_size] + counter = block[0] # TODO: check for packet loss + row = [] + for channel in range(0, CHANNELS): + # TODO: check impedance + channel_offset = 1 + (channel * 3) + sample = int.from_bytes(block[channel_offset:channel_offset + 3], byteorder="big", signed=True) + sample *= (1e6 * ((4.5 / 8388607) / self.gain)) + row.append(sample) + rows.append(row) + return timestamp, rows + + + def update(self): + """Update the node output.""" + self._lock.acquire() + if self._rows: + self.o.set(self._rows, self._timestamps, self.names, meta=self.meta) + self._reset() + self._lock.release() + + + def terminate(self): + """Cleanup.""" + self._running = False + while self._thread and self._thread.is_alive(): + sleep(0.001) + +