-
Notifications
You must be signed in to change notification settings - Fork 22
Description
Here is a DEMO, modifie the threephase_patterns.py with this code:
from abc import ABC, abstractmethod
import numpy as np
import time
import logging
import random # NEU: Für zufällige Auswahl
Angepasster Import für PySide6.QtCore
from PySide6.QtCore import QObject, Signal, QTimer, Slot
Annahme: qt_ui.settings ist korrekt eingebunden
try:
import qt_ui.settings
except ImportError:
# Fallback Mock-Objekte, falls qt_ui.settings beim Testen nicht gefunden wird
class MockSettings:
def get(self): return 60.0
class MockSettingValue:
display_fps = MockSettings(); display_latency = MockSettings()
qt_ui = type('qt_ui', (), {'settings': MockSettingValue})
logging.warning("qt_ui.settings nicht gefunden, verwende Mock-Objekte.")
Annahme: Ihre Achsenklassen sind korrekt eingebunden
try:
from stim_math.axis import AbstractAxis, WriteProtectedAxis
except ImportError:
logging.warning("stim_math.axis nicht gefunden, verwende Dummy-Achsenklassen.")
class WriteProtectedAxis:
def init(self, name, initial_value=0.0): self._value = initial_value
def get(self): return self._value
def set(self, value): self._value = value
class AbstractAxis:
def init(self, name="dummy", write_protected_axis=None):
self._wp_axis = write_protected_axis if write_protected_axis else WriteProtectedAxis(name + "_wp")
self._val = 0.0
def add(self, value, timestamp=None): self._val = value
def last_value(self): return self._val
def interpolate(self, timestamp): return self._val
def get_write_protected_axis(self): return self._wp_axis
logger = logging.getLogger('restim.motion_generation')
Für detaillierteres Debugging können Sie das Level hier oder global anpassen:
logging.basicConfig(level=logging.DEBUG)
logger.setLevel(logging.DEBUG)
class ThreephasePattern(ABC):
def init(self):
pass # Ersetzt '...'
@abstractmethod # Hinzugefügt, da name() in Subklassen implementiert wird
def name(self):
pass # Ersetzt '...'
@abstractmethod
def update(self, dt: float):
pass # Ersetzt '...'
class MousePattern(ThreephasePattern):
def init(self, alpha: AbstractAxis, beta: AbstractAxis):
super().init()
self.alpha = alpha
self.beta = beta
self.x = 0.00001 # hack to force display update on load
self.y = 0
def name(self):
return "mouse" # Name klein geschrieben für Konsistenz mit ComboBox
def mouse_event(self, x, y):
self.alpha.add(x) # Annahme: add() kümmert sich um Timestamps falls nötig
self.beta.add(y)
self.x = x
self.y = y
def update(self, dt: float):
return self.x, self.y
def last_position_is_mouse_position(self):
return (self.x, self.y) == (self.alpha.last_value(), self.beta.last_value())
class CirclePattern(ThreephasePattern):
def init(self):
super().init()
self.angle = 0.0
self.current_speed_factor = 1.0 # Start-Geschwindigkeitsfaktor
self.speed_adjustment_step = 0.5 # Schrittweite der zufälligen Anpassung
self.min_speed_factor = 0.2 # Minimale Geschwindigkeit (verhindert Stopp, wenn >0)
self.max_speed_factor = 3.0 # Maximale Geschwindigkeit
logger.info(f"CirclePattern ID {id(self)} initialisiert. SpeedFactor: {self.current_speed_factor:.1f}, Step: {self.speed_adjustment_step:.1f}, Min/Max: {self.min_speed_factor:.1f}/{self.max_speed_factor:.1f}")
def name(self):
return "Circle" # Name wie in ComboBox
def update(self, dt: float):
self.angle = self.angle + dt * self.current_speed_factor # Verwendet den variablen Faktor
return np.cos(self.angle), np.sin(self.angle)
def randomly_adjust_speed(self):
adjustment = random.choice([-self.speed_adjustment_step, self.speed_adjustment_step])
new_speed_factor = self.current_speed_factor + adjustment
self.current_speed_factor = np.clip(new_speed_factor, self.min_speed_factor, self.max_speed_factor)
logger.info(f"CirclePattern ID {id(self)} speed_factor zufällig angepasst: {self.current_speed_factor:.2f} (Anpassung: {adjustment:.2f})")
class ThreephaseMotionGenerator(QObject):
position_updated = Signal(float, float)
def __init__(self, parent, alpha: AbstractAxis, beta: AbstractAxis):
super().__init__(parent)
self.alpha = alpha
self.beta = beta
self.script_alpha = None
self.script_beta = None
self.mouse_pattern = MousePattern(alpha, beta)
self.circle_pattern_instance = CirclePattern() # Explizite Instanz für CirclePattern
self.patterns = [
self.mouse_pattern,
self.circle_pattern_instance,
# Hier könnten später weitere Pattern-Instanzen hinzugefügt werden
]
self.pattern = self.mouse_pattern # Standardmuster
self.velocity = 1.0 # Globale Geschwindigkeit, als float initialisieren
self.last_update_time = time.time()
self.latency = 0.0 # Als float initialisieren
self.timer = QTimer(self)
self.timer.timeout.connect(self.timeout)
# self.timer.setInterval(int(1000 / 60.0)) # Wird in refreshSettings gesetzt
# NEU: Timer für zufällige Geschwindigkeitsanpassung des CirclePatterns
self.random_circle_speed_timer = QTimer(self)
self.random_circle_speed_timer.setInterval(1000) # 1000 ms = 1 Sekunde
self.random_circle_speed_timer.timeout.connect(self._on_random_circle_speed_tick)
self.refreshSettings() # Ruft auch timer.setInterval auf
logger.info("ThreephaseMotionGenerator initialisiert.")
@Slot()
def _on_random_circle_speed_tick(self):
if isinstance(self.pattern, CirclePattern) and self.timer.isActive(): # Nur wenn Circle aktiv & Haupt-Timer läuft
logger.debug(f"Random circle speed tick: CirclePattern ID {id(self.pattern)} aktiv, passe Geschwindigkeit an.")
self.pattern.randomly_adjust_speed() # Ruft Methode auf der aktuellen Pattern-Instanz auf
# else:
# logger.debug("Random circle speed tick: Bedingung nicht erfüllt (Circle nicht aktiv oder Haupt-Timer aus).")
def set_enable(self, enable: bool):
logger.info(f"Generator set_enable: {enable}")
if enable:
self.last_update_time = time.time()
if not self.timer.isActive():
self.timer.start()
logger.info("Haupt-Timer gestartet.")
# Starte den Zufalls-Timer, wenn CirclePattern aktiv ist
if isinstance(self.pattern, CirclePattern):
if not self.random_circle_speed_timer.isActive():
self.random_circle_speed_timer.start()
logger.info(f"Random Circle Speed Timer für ID {id(self.pattern)} gestartet (Generator aktiviert, Circle aktiv).")
else:
if self.timer.isActive():
self.timer.stop()
logger.info("Haupt-Timer gestoppt.")
if self.random_circle_speed_timer.isActive():
self.random_circle_speed_timer.stop()
logger.info("Random Circle Speed Timer gestoppt (Generator deaktiviert).")
def set_pattern(self, pattern_input): # Akzeptiert String (Name) oder Pattern-Instanz
previous_pattern_was_circle = isinstance(self.pattern, CirclePattern)
previous_pattern_instance = self.pattern
new_selected_pattern = None
target_pattern_name = None
if isinstance(pattern_input, str):
target_pattern_name = pattern_input
for p_instance in self.patterns:
if p_instance.name() == target_pattern_name:
new_selected_pattern = p_instance
break
elif isinstance(pattern_input, ThreephasePattern):
# Prüfen, ob die Instanz eine der bekannten ist (optional, aber sicherer)
if pattern_input in self.patterns:
new_selected_pattern = pattern_input
target_pattern_name = new_selected_pattern.name() # Holen des Namens für Logging
else:
logger.warning(f"Unbekannte Pattern-Instanz {pattern_input} an set_pattern übergeben.")
return
else:
logger.error(f"Ungültiger Typ für pattern_input in set_pattern: {type(pattern_input)}. Erwartet str oder ThreephasePattern Instanz.")
return
if new_selected_pattern is None:
logger.warning(f"Pattern '{target_pattern_name if target_pattern_name else pattern_input}' nicht gefunden. Pattern bleibt: '{self.pattern.name()}'.")
return
if self.pattern == new_selected_pattern:
logger.info(f"Pattern '{new_selected_pattern.name()}' (ID {id(new_selected_pattern)}) ist bereits aktiv.")
# Sicherstellen, dass der Timer läuft, falls Circle aktiv und Generator an ist
if isinstance(self.pattern, CirclePattern) and self.timer.isActive() and not self.random_circle_speed_timer.isActive():
self.random_circle_speed_timer.start()
logger.info(f"Random Circle Speed Timer für ID {id(self.pattern)} gestartet (Pattern bestätigt, Generator aktiv).")
return
logger.info(f"Pattern wechselt von '{previous_pattern_instance.name()} (ID {id(previous_pattern_instance)})' zu '{new_selected_pattern.name()} (ID {id(new_selected_pattern)})'")
self.pattern = new_selected_pattern
current_pattern_is_circle = isinstance(self.pattern, CirclePattern)
# Management des random_circle_speed_timer
if self.timer.isActive(): # Nur wenn der Hauptgenerator aktiviert ist
if current_pattern_is_circle:
if not self.random_circle_speed_timer.isActive():
self.random_circle_speed_timer.start()
logger.info(f"Random Circle Speed Timer für ID {id(self.pattern)} gestartet (Pattern zu Circle gewechselt, Generator aktiv).")
elif previous_pattern_was_circle: # Es war Circle, ist es aber nicht mehr
if self.random_circle_speed_timer.isActive():
self.random_circle_speed_timer.stop()
logger.info(f"Random Circle Speed Timer für ehem. Circle ID {id(previous_pattern_instance)} gestoppt (Pattern von Circle gewechselt, Generator aktiv).")
else: # Hauptgenerator ist nicht aktiv
# Wenn der Zufallstimer (fälschlicherweise) noch aktiv ist und wir nicht mehr auf Circle sind, stoppen.
if not current_pattern_is_circle and self.random_circle_speed_timer.isActive():
self.random_circle_speed_timer.stop()
logger.info("Random Circle Speed Timer gestoppt (Generator nicht aktiv, Pattern nicht Circle).")
def set_scripts(self, alpha, beta):
self.script_alpha = alpha if isinstance(alpha, WriteProtectedAxis) else None
self.script_beta = beta if isinstance(beta, WriteProtectedAxis) else None
if self.any_scripts_loaded():
if self.random_circle_speed_timer.isActive():
self.random_circle_speed_timer.stop()
logger.info("Random Circle Speed Timer gestoppt (Skript geladen).")
else: # Keine Skripte geladen
# Wenn Circle das aktuelle Pattern ist und der Generator läuft, starte den Timer neu
if isinstance(self.pattern, CirclePattern) and self.timer.isActive():
if not self.random_circle_speed_timer.isActive():
self.random_circle_speed_timer.start()
logger.info(f"Random Circle Speed Timer für ID {id(self.pattern)} gestartet (Skript entladen, Circle aktiv, Generator an).")
def any_scripts_loaded(self):
return (self.script_alpha is not None) or (self.script_beta is not None)
def set_velocity(self, velocity):
try:
self.velocity = float(velocity) # Sicherstellen, dass es ein Float ist
logger.info(f"Globale Geschwindigkeit auf: {self.velocity:.2f} gesetzt.")
except ValueError:
logger.error(f"Ungültiger Wert für globale Geschwindigkeit: {velocity}")
@Slot()
def timeout(self):
current_time = time.time()
dt = current_time - self.last_update_time
self.last_update_time = current_time
# Umgang mit dt (Delta Time)
target_fps_setting = qt_ui.settings.display_fps.get()
target_fps = 60.0 # Fallback
if isinstance(target_fps_setting, (int, float)) and target_fps_setting > 0:
target_fps = target_fps_setting
min_dt_for_calc = 1.0 / target_fps
max_dt = 0.1 # Maximal erlaubter dt, z.B. 100ms
if dt <= 0:
# logger.debug(f"dt war {dt:.4f}s, wird auf {min_dt_for_calc:.4f}s (aus FPS) gesetzt.")
dt = min_dt_for_calc
elif dt > max_dt:
logger.warning(f"Großer dt erkannt ({dt:.3f}s), begrenzt auf {max_dt:.3f}s.")
dt = max_dt
if not self.any_scripts_loaded():
if isinstance(self.pattern, MousePattern):
if self.pattern.last_position_is_mouse_position():
pass # Nichts zu tun, Maus hat sich nicht bewegt oder wurde schon verarbeitet
else:
# Fall für tcode-Positionen oder andere Updates über MousePattern
a = self.alpha.interpolate(current_time - self.latency)
b = self.beta.interpolate(current_time - self.latency)
self.position_updated.emit(a, b)
else: # Gilt für CirclePattern und andere zeitbasierte Patterns
a_pattern, b_pattern = self.pattern.update(dt * self.velocity)
self.alpha.add(a_pattern)
self.beta.add(b_pattern)
a_display = self.alpha.interpolate(current_time - self.latency)
b_display = self.beta.interpolate(current_time - self.latency)
self.position_updated.emit(a_display, b_display)
else: # Scripts sind geladen
a_val = self.alpha.interpolate(current_time - self.latency) # Standardwert
b_val = self.beta.interpolate(current_time - self.latency) # Standardwert
if self.script_alpha:
a_val = self.script_alpha.interpolate(current_time - self.latency)
if self.script_beta:
b_val = self.script_beta.interpolate(current_time - self.latency)
self.position_updated.emit(a_val, b_val)
def mouse_event(self, a, b): # Parameter umbenannt, um Klarheit zu schaffen
if isinstance(self.pattern, MousePattern) and not self.any_scripts_loaded():
self.pattern.mouse_event(a, b)
# Die alpha/beta Achsen werden durch pattern.mouse_event() aktualisiert.
# Sofortiges Senden der Mausposition an die UI.
self.position_updated.emit(a, b)
def refreshSettings(self):
try:
fps_setting = qt_ui.settings.display_fps.get()
if not isinstance(fps_setting, (int, float)) or fps_setting <= 0:
logger.warning(f"Ungültiger display_fps Wert '{fps_setting}', verwende Fallback: 60.0")
fps = 60.0
else:
fps = float(fps_setting)
except Exception as e:
logger.error(f"Fehler beim Lesen von display_fps: {e}. Verwende Fallback 60.0")
fps = 60.0
self.timer.setInterval(int(1000 // np.clip(fps, 1.0, 500.0)))
try:
latency_setting = qt_ui.settings.display_latency.get()
if not isinstance(latency_setting, (int, float)) or latency_setting < 0:
logger.warning(f"Ungültiger display_latency Wert '{latency_setting}', verwende Fallback: 0.0")
latency_ms = 0.0
else:
latency_ms = float(latency_setting)
except Exception as e:
logger.error(f"Fehler beim Lesen von display_latency: {e}. Verwende Fallback 0.0")
latency_ms = 0.0
self.latency = latency_ms / 1000.0
logger.info(f"Einstellungen aktualisiert: Timer Intervall {self.timer.interval()}ms (Ziel-FPS: {fps:.1f}), Latenz {self.latency*1000:.1f}ms")
TODO: re-instate old patterns and add more (Original-Kommentar)
"""
(Original-Kommentarblock mit alten Pattern-Definitionen)
"""