Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions sendspin/audio_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,6 @@ def _start_audio_worker(self, client: SendspinClient) -> None:
if not self._audio_worker.is_running():
raise RuntimeError("Audio worker failed to start")

def _require_audio_worker(self) -> _AudioSyncWorker:
"""Get a running audio worker or raise immediately."""
worker = self._audio_worker
if worker is None or not worker.is_running():
raise RuntimeError("Audio worker is not running")
return worker

def _clear_audio_worker(self) -> None:
"""Clear worker queue when worker is available."""
worker = self._audio_worker
Expand All @@ -399,8 +392,9 @@ def _on_audio_chunk(
self, server_timestamp_us: int, audio_data: bytes | bytearray, fmt: AudioFormat
) -> None:
"""Handle incoming audio chunks by enqueueing them to the sync worker."""
assert self._client is not None, "Received audio chunk but client is not attached"
worker = self._require_audio_worker()
worker = self._audio_worker
if worker is None or not worker.is_running():
raise RuntimeError("Audio worker is not running")

pcm_format = fmt.pcm_format
if self._current_format != fmt:
Expand All @@ -417,6 +411,11 @@ def _on_audio_chunk(

def _on_stream_start(self, _message: StreamStartMessage) -> None:
"""Handle stream start by clearing stale audio chunks."""
assert self._client is not None, "Received stream start but client is not attached"
if self._audio_worker is None or not self._audio_worker.is_running():
self._audio_worker = None
self._start_audio_worker(self._client)

self._clear_audio_worker()

if not self._stream_active:
Expand Down Expand Up @@ -445,11 +444,8 @@ def clear_queue(self) -> None:
"""Clear the audio queue to prevent desync."""
self._clear_audio_worker()

async def cleanup(self) -> None:
"""Stop audio worker, hardware monitoring, and clear resources."""
if self._hw_volume is not None:
await self._hw_volume.stop_monitoring()

async def handle_disconnect(self) -> None:
"""Reset connection-scoped audio state after a disconnect."""
if self._stream_active:
self._stream_active = False
if self._on_event:
Expand All @@ -461,3 +457,10 @@ async def cleanup(self) -> None:

self._current_format = None
self.audio_player = None

async def shutdown(self) -> None:
"""Stop audio worker, hardware monitoring, and clear resources."""
await self.handle_disconnect()

if self._hw_volume is not None:
await self._hw_volume.stop_monitoring()
12 changes: 8 additions & 4 deletions sendspin/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ def signal_handler() -> None:
except asyncio.CancelledError:
logger.debug("Daemon cancelled")
finally:
await self._stop_mpris_and_audio()
if self._mpris is not None:
self._mpris.stop()
self._mpris = None
if self._audio_handler is not None:
await self._audio_handler.shutdown()
if self._client is not None:
await self._client.disconnect()
self._client = None
Expand Down Expand Up @@ -201,12 +205,12 @@ async def _run_server_initiated(self, static_delay_ms: float) -> None:
await asyncio.sleep(3600)

async def _stop_mpris_and_audio(self) -> None:
"""Stop MPRIS and cleanup audio handler."""
"""Stop MPRIS and reset connection-scoped audio state."""
if self._mpris is not None:
self._mpris.stop()
self._mpris = None
if self._audio_handler is not None:
await self._audio_handler.cleanup()
await self._audio_handler.handle_disconnect()

async def _handle_server_connection(self, ws: web.WebSocketResponse) -> None:
"""Handle an incoming server connection."""
Expand Down Expand Up @@ -293,7 +297,7 @@ async def _connection_loop(self, url: str) -> None:

# Connection dropped
logger.info("Disconnected from server")
await self._audio_handler.cleanup()
await self._audio_handler.handle_disconnect()

logger.info("Reconnecting to %s", url)

Expand Down
4 changes: 2 additions & 2 deletions sendspin/tui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def signal_handler() -> None:
if self._ui:
self._ui.stop()
if self._audio_handler:
await self._audio_handler.cleanup()
await self._audio_handler.shutdown()
assert self._client is not None
await self._client.disconnect()
await self._discovery.stop()
Expand Down Expand Up @@ -510,7 +510,7 @@ async def _connection_loop(self, *, already_connected: bool = False) -> None:
ui.set_disconnected("Connection lost")

# Clean up audio state
await audio_handler.cleanup()
await audio_handler.handle_disconnect()

# Check for pending URL from server selection first
pending_server = manager.consume_pending_server()
Expand Down
127 changes: 127 additions & 0 deletions tests/test_audio_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from __future__ import annotations

import asyncio
from types import SimpleNamespace

import sendspin.audio_connector as audio_connector
from sendspin.audio_connector import AudioStreamHandler


class _FakeWorker:
instances: list[_FakeWorker] = []

def __init__(
self,
*,
audio_device: object,
use_software_volume: bool,
volume: int,
muted: bool,
) -> None:
self.audio_device = audio_device
self.use_software_volume = use_software_volume
self.volume = volume
self.muted = muted
self.running = False
self.submitted: list[tuple[int, bytes | bytearray, object]] = []
_FakeWorker.instances.append(self)

def start(self, compute_play_time: object, compute_server_time: object) -> None:
self.running = True
self.compute_play_time = compute_play_time
self.compute_server_time = compute_server_time

def is_running(self) -> bool:
return self.running

def submit_chunk(
self, server_timestamp_us: int, audio_data: bytes | bytearray, fmt: object
) -> None:
self.submitted.append((server_timestamp_us, audio_data, fmt))

def clear(self) -> None:
return

def set_volume(self, volume: int, *, muted: bool) -> None:
self.volume = volume
self.muted = muted

async def stop(self) -> None:
self.running = False


class _FakeClient:
def __init__(self) -> None:
self.connected = True
self.audio_chunk_listeners: list[object] = []
self.stream_start_listeners: list[object] = []
self.stream_end_listeners: list[object] = []
self.stream_clear_listeners: list[object] = []

def compute_play_time(self, timestamp_us: int) -> int:
return timestamp_us

def compute_server_time(self, timestamp_us: int) -> int:
return timestamp_us

async def send_player_state(self, **_: object) -> None:
return

def add_audio_chunk_listener(self, callback: object):
return self._add_listener(self.audio_chunk_listeners, callback)

def add_stream_start_listener(self, callback: object):
return self._add_listener(self.stream_start_listeners, callback)

def add_stream_end_listener(self, callback: object):
return self._add_listener(self.stream_end_listeners, callback)

def add_stream_clear_listener(self, callback: object):
return self._add_listener(self.stream_clear_listeners, callback)

@staticmethod
def _add_listener(callbacks: list[object], callback: object):
callbacks.append(callback)
return lambda: None


def _make_format() -> SimpleNamespace:
return SimpleNamespace(
codec=SimpleNamespace(value="pcm"),
pcm_format=SimpleNamespace(sample_rate=48_000, bit_depth=16, channels=2),
)


def test_audio_worker_restarts_on_stream_start_after_disconnect(monkeypatch) -> None:
monkeypatch.setattr(audio_connector, "_AudioSyncWorker", _FakeWorker)
_FakeWorker.instances.clear()

async def exercise() -> None:
handler = AudioStreamHandler(
audio_device=SimpleNamespace(index=0, name="Fake Device"),
volume=10,
muted=False,
)
client = _FakeClient()
handler.attach_client(client)
handler.set_volume(37, muted=True)
await asyncio.sleep(0)

await handler.handle_disconnect()
assert len(_FakeWorker.instances) == 1
assert not _FakeWorker.instances[0].running

fmt = _make_format()
handler._on_stream_start(object())

assert len(_FakeWorker.instances) == 2
restarted_worker = _FakeWorker.instances[1]
assert restarted_worker.running
assert restarted_worker.volume == 37
assert restarted_worker.muted is True

handler._on_audio_chunk(123_456, b"payload", fmt)

assert restarted_worker.submitted == [(123_456, b"payload", fmt)]

asyncio.run(exercise())