Skip to content

Commit

Permalink
feat(linwave): set_tr_samples
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasberbuer committed Dec 10, 2024
1 parent e115b48 commit e36bbc8
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 6 deletions.
25 changes: 25 additions & 0 deletions src/waveline/linwave.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,31 @@ async def set_tr_decimation(self, channel: int, factor: int):
self._channel_settings[1].decimation = factor
self._channel_settings[2].decimation = factor

@_require_connected
async def set_tr_samples(
self, channel: int, min_samples: int = 0, max_samples: int | None = None
):
"""
Set number of samples (limits) for transient data.
Per default, the number of samples matches the hit-duration (duration-adapted recording).
With this command, the number of TR samples can be defined with limits (minimum, maximum).
Fixing the length of transient data can be particularly useful for applications such as
frequency analysis, where a consistent frequency resolution is required.
Args:
channel: Channel number (0 for all channels)
min_samples: Minimum number of samples. Default is `0` (duration-adapted).
Set to `0` to disable a fixed minimum limit.
max_samples: Maximum number of samples. Default is `None`, which sets `max_samples`
equal to `min_samples`.
Set to `0` to disable a fixed maximum limit.
"""
self._check_channel_number(channel)
min_samples = int(min_samples)
max_samples = min_samples if max_samples is None else int(max_samples)
await self._send_command(f"set_acq tr_samples {min_samples} {max_samples} @{channel:d}")

@_require_connected
async def set_tr_pretrigger(self, channel: int, samples: int):
"""
Expand Down
45 changes: 39 additions & 6 deletions tests/system/linwave/test_acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ async def test_acq_continuous_mode(lw, channel):
await lw.set_ddt(0, ddt)
await lw.set_tr_enabled(0, True)
await lw.set_tr_decimation(0, decimation)
await lw.set_tr_samples(0, 0) # adaptive

await lw.start_acquisition()
await asyncio.sleep(acq_duration)
Expand All @@ -122,6 +123,41 @@ async def test_acq_continuous_mode(lw, channel):
assert record.samples == expected_samples


@pytest.mark.xfail(reason="only available since firmware version 2.32")
@pytest.mark.parametrize(
("ddt_samples", "min_samples", "max_samples", "expected_samples"),
[
(10_000, 0, None, 10_000),
(10_000, 0, 0, 10_000),
(10_000, 0, 5000, 5_000),
(10_000, 0, 20_000, 10_000),
(10_000, 20_000, None, 20_000),
(10_000, 20_000, 0, 20_000),
],
)
async def test_acq_tr_samples(lw, ddt_samples, min_samples, max_samples, expected_samples):
await lw.set_channel(0, True)
await lw.set_status_interval(0, 0)
await lw.set_continuous_mode(0, True)
await lw.set_ddt(0, ddt_samples) # samples = µs for 1 MHz sampling rate
await lw.set_tr_enabled(0, True)
await lw.set_tr_decimation(0, 10)
await lw.set_tr_samples(0, min_samples, max_samples)

await lw.start_acquisition()
await asyncio.sleep(0.1)
await lw.stop_acquisition()
await asyncio.sleep(0.1)

ae_data = await lw.get_ae_data()
tr_data = await lw.get_tr_data()

print(ae_data)
print(tr_data)

assert all([tr.samples == expected_samples for tr in tr_data]) # noqa: C419


@pytest.mark.xfail(reason="only available since firmware version 2.13")
@pytest.mark.parametrize("channel", [0, 1, 2])
@pytest.mark.parametrize("samples", [0, 1_000, 1_000_000])
Expand Down Expand Up @@ -154,19 +190,17 @@ async def test_pulsing(lw, channel, interval, count):
await lw.set_continuous_mode(0, False)
await lw.set_ddt(0, 1000)
await lw.set_threshold(0, 10_000)
await lw.set_tr_enabled(0, True)
await lw.set_tr_decimation(0, 100)
await lw.set_tr_enabled(0, False)

await lw.start_acquisition()
await lw.start_pulsing(channel, interval, count)
await asyncio.sleep(count * interval + 0.1)
await lw.stop_acquisition()
await asyncio.sleep(0.1)

ae_data = await lw.get_ae_data()
tr_data = await lw.get_tr_data()

assert len(ae_data) == count
assert len(tr_data) == count


@pytest.mark.parametrize("interval", [0.1, 0.5])
Expand All @@ -189,9 +223,8 @@ async def test_stop_infinite_pulsing(lw, channel, interval):
await lw.stop_pulsing()
await asyncio.sleep(acq_time) # now no new hits should be generated
await lw.stop_acquisition()
await asyncio.sleep(0.1)

ae_data = await lw.get_ae_data()
tr_data = await lw.get_tr_data()

assert len(ae_data) == pytest.approx(expected_pulse_count, abs=1)
assert len(tr_data) == 0
15 changes: 15 additions & 0 deletions tests/unit/test_linwave.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,21 @@ async def test_set_filter_invalid_channel(mock_objects, channel):
await mock_objects.lw.set_filter(channel, 50e3, 300e3, 4)


@pytest.mark.parametrize(
("channel", "min_samples", "max_samples", "command"),
[
(0, 0, None, b"set_acq tr_samples 0 0 @0\n"),
(0, 2048, None, b"set_acq tr_samples 2048 2048 @0\n"),
(1, 1024, 2048, b"set_acq tr_samples 1024 2048 @1\n"),
(1, 1024, 0, b"set_acq tr_samples 1024 0 @1\n"),
],
)
async def test_set_tr_samples(mock_objects, channel, min_samples, max_samples, command):
lw, _, writer = mock_objects
await lw.set_tr_samples(channel, min_samples, max_samples)
writer.write.assert_called_with(command)


@pytest.mark.parametrize(
("channel", "samples", "command"),
[
Expand Down

0 comments on commit e36bbc8

Please sign in to comment.