Skip to content

Commit 1a2b254

Browse files
committed
reworked on given comments
1 parent fbc48f2 commit 1a2b254

7 files changed

+78
-60
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Helper functions for the analog_out example."""
2+
3+
import numpy
4+
import numpy.typing
5+
6+
np = numpy
7+
8+
9+
def create_sine_wave(
10+
frequency: float, amplitude: float, sampling_rate: float, duration: float
11+
) -> numpy.typing.NDArray[numpy.double]:
12+
"""Generate a sine wave."""
13+
t = np.arange(0, duration, 1 / sampling_rate)
14+
15+
return amplitude * np.sin(2 * np.pi * frequency * t)
16+
17+
18+
def create_voltage_sample(voltage: float, number_of_samples: int) -> float:
19+
"""Generate a voltage sample."""
20+
data = []
21+
for i in range(number_of_samples):
22+
data.append(voltage * i / number_of_samples)
23+
24+
return data

examples/analog_out/cont_gen_voltage_wfm_int_clk.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,22 @@
44
waveform using an internal sample clock.
55
"""
66

7-
import numpy as np
7+
from analog_out_helper import create_sine_wave
88

99
import nidaqmx
1010
from nidaqmx.constants import AcquisitionType
1111

1212
with nidaqmx.Task() as task:
13-
frequency = 10
14-
amplitude = 1
15-
sampling_rate = 100
16-
duration = 1
17-
18-
# generate the time array
19-
t = np.arange(0, duration, 1 / sampling_rate)
20-
# Generate the sine wave
21-
data = amplitude * np.sin(2 * np.pi * frequency * t)
13+
sampling_rate = 1000.0
14+
data = create_sine_wave(
15+
frequency=10.0, amplitude=1.0, sampling_rate=sampling_rate, duration=1.0
16+
)
2217

2318
task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
24-
task.timing.cfg_samp_clk_timing(
25-
1000, sample_mode=AcquisitionType.CONTINUOUS, samps_per_chan=100
26-
)
19+
task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.CONTINUOUS)
2720
task.write(data)
2821
task.start()
2922

30-
input("Running task. Press Enter to stop.\n")
23+
input("Generating voltage continuously. Press Enter to stop.\n")
3124

3225
task.stop()

examples/analog_out/cont_gen_voltage_wfm_int_clk_every_n_samples_event.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,29 @@
66
the specified number of samples generation is complete.
77
"""
88

9-
import numpy as np
9+
from analog_out_helper import create_sine_wave
1010

1111
import nidaqmx
1212
from nidaqmx.constants import AcquisitionType
1313

1414
with nidaqmx.Task() as task:
15-
frequency = 10
16-
amplitude = 1
17-
sampling_rate = 100
18-
duration = 1
19-
20-
# generate the time array
21-
t = np.arange(0, duration, 1 / sampling_rate)
22-
# Generate the sine wave
23-
data = amplitude * np.sin(2 * np.pi * frequency * t)
15+
sampling_rate = 1000.0
16+
data = create_sine_wave(
17+
frequency=10.0, amplitude=1.0, sampling_rate=sampling_rate, duration=1.0
18+
)
2419

2520
def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data):
2621
"""Callback function for written data."""
27-
print(f"transferred {number_of_samples} samples event invoked.")
22+
print("Transferred N samples")
23+
2824
return 0
2925

3026
task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
31-
task.timing.cfg_samp_clk_timing(
32-
1000, sample_mode=AcquisitionType.CONTINUOUS, samps_per_chan=100
33-
)
34-
task.register_every_n_samples_transferred_from_buffer_event(100, callback)
27+
task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.CONTINUOUS)
28+
task.register_every_n_samples_transferred_from_buffer_event(1000, callback)
3529
task.write(data)
3630
task.start()
3731

38-
input("Running task. Press Enter to stop.\n")
32+
input("Generating voltage continuously. Press Enter to stop.\n")
3933

4034
task.stop()

examples/analog_out/cont_gen_voltage_wfm_int_clk_non_regen.py

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,38 @@
11
"""Example of analog output voltage generation.
22
3-
This example demonstrates how to output a continuous periodic
4-
waveform with new data using an internal sample clock while
5-
the task is running.
3+
This example demonstrates how to continuously generate an
4+
analog output waveform by providing new data to the output buffer
5+
as the task is running.
6+
7+
This example is useful if you want to generate a non-repeating waveform,
8+
make updates on-the-fly, or generate a frequency that is not an
9+
even divide-down of your sample clock. In this example,
10+
the default frequency value is 17.0 to demonstrate that non-regenerative output
11+
can be used to create a signal with a frequency that is not an even divide-down
12+
of your sample clock.
613
"""
714

8-
import time
9-
10-
import numpy as np
15+
from analog_out_helper import create_sine_wave
1116

1217
import nidaqmx
1318
from nidaqmx.constants import AcquisitionType, RegenerationMode
1419

1520

16-
def get_sine_wave_data(cycle):
17-
"""Generate a sine wave data."""
18-
frequency = (10, 50)
19-
amplitude = (1, 3)
20-
sampling_rate = (100, 500)
21-
duration = 1
22-
selection = cycle % 2
23-
24-
# generate the time array
25-
t = np.arange(0, duration, 1 / sampling_rate[selection])
26-
# Generate the sine wave
27-
data = amplitude[selection] * np.sin(2 * np.pi * frequency[selection] * t)
28-
return data
29-
30-
3121
with nidaqmx.Task() as task:
3222
is_first_run = True
23+
sampling_rate = 1000.0
3324
task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
3425
task.out_stream.regen_mode = RegenerationMode.DONT_ALLOW_REGENERATION
35-
task.timing.cfg_samp_clk_timing(
36-
1000, sample_mode=AcquisitionType.CONTINUOUS, samps_per_chan=100
37-
)
26+
task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.CONTINUOUS)
3827

3928
try:
4029
cycle = 1
41-
print("Starting task. Press Ctrl+C to stop.")
42-
time.sleep(1.0)
30+
print("Generating voltage continuously. Press Ctrl+C to stop.")
4331

4432
while True:
45-
data = get_sine_wave_data(cycle)
33+
data = create_sine_wave(
34+
frequency=17.0, amplitude=1.0, sampling_rate=sampling_rate, duration=1.0
35+
)
4636
task.write(data)
4737
cycle = cycle + 1
4838
if is_first_run:

examples/analog_out/gen_voltage_wfm_int_clk.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,21 @@
55
sample clock.
66
"""
77

8+
from analog_out_helper import create_voltage_sample
9+
810
import nidaqmx
911
from nidaqmx.constants import AcquisitionType
1012

1113
with nidaqmx.Task() as task:
14+
data = []
15+
total_samples = 1000
1216
task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
13-
task.timing.cfg_samp_clk_timing(1000, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000)
17+
task.timing.cfg_samp_clk_timing(
18+
1000, sample_mode=AcquisitionType.FINITE, samps_per_chan=total_samples
19+
)
1420

15-
print(f"Generate {task.write([1.1, 2.2, 3.3, 4.4, 5.5])} voltage samples.")
21+
data = create_voltage_sample(5.0, total_samples)
22+
number_of_samples_written = task.write(data, auto_start=True)
23+
print(f"Generate {number_of_samples_written} voltage samples.")
1624
task.wait_until_done()
1725
task.stop()

examples/analog_out/gen_voltage_sample.py renamed to examples/analog_out/voltage_update.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,3 @@
1010
task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
1111

1212
print(f"Generate {task.write(1.1)} voltage sample.")
13-
task.stop()

tests/acceptance/test_examples.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from __future__ import annotations
22

33
import contextlib
4+
import os
45
import re
56
import runpy
7+
import sys
68
import warnings
79
from pathlib import Path
810

@@ -29,6 +31,8 @@ def test___shipping_example___run___no_errors(example_path: Path, system):
2931
)
3032
if example_path.name == "ci_pulse_freq.py":
3133
pytest.skip("Example times out if there is no signal.")
34+
if example_path.name == "analog_out_helper.py":
35+
pytest.skip("Helper for analog output.")
3236
if re.search(r"\binput\(|\bKeyboardInterrupt\b", example_source):
3337
pytest.skip("Example waits for keyboard input.")
3438
if example_path.name == "nidaqmx_warnings.py":
@@ -38,6 +42,12 @@ def test___shipping_example___run___no_errors(example_path: Path, system):
3842
context_manager = contextlib.nullcontext()
3943

4044
with context_manager:
45+
example_dir = os.path.dirname(os.path.abspath(example_path))
46+
if example_dir not in sys.path:
47+
# Solving ModuleNotFoundError.
48+
# Add the example directory to sys.path so that the
49+
# helper module can be found.
50+
sys.path.append(example_dir)
4151
runpy.run_path(str(example_path))
4252

4353

0 commit comments

Comments
 (0)