diff --git a/examples/multi-functions-synchronization/ai_ao_sync.py b/examples/multi-functions-synchronization/ai_ao_sync.py deleted file mode 100644 index 4ff108fa..00000000 --- a/examples/multi-functions-synchronization/ai_ao_sync.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Example of analog input and output synchronization. - -This example demonstrates how to continuously acquire and -generate data at the same time, synchronized with one another. -""" - -from typing import Tuple - -import numpy as np -import numpy.typing - -import nidaqmx -from nidaqmx.constants import AcquisitionType - - -def generate_sine_wave( - frequency: float, - amplitude: float, - sampling_rate: float, - phase_in: float, - number_of_samples: int, -) -> Tuple[numpy.typing.NDArray[numpy.double], float]: - """Generates a sine wave with a specified phase. - - Args: - frequency (float): Specifies the frequency of the sine wave. - amplitude (float): Specifies the amplitude of the sine wave. - sampling_rate (float): Specifies the sampling rate of the sine wave. - phase_in (float): Specifies the phase of the sine wave in radians. - number_of_samples (int): Specifies the number of samples to generate. - - Returns: - Tuple[numpy.typing.NDArray[numpy.double], float]: Indicates a tuple - containing the generated data and the phase of the sine wave after generation. - """ - duration_time = number_of_samples / sampling_rate - duration_radians = duration_time * 2 * np.pi - phase_out = (phase_in + duration_radians) % (2 * np.pi) - t = np.linspace(phase_in, phase_in + duration_radians, number_of_samples, endpoint=False) - - return (amplitude * np.sin(frequency * t), phase_out) - - -def main(): - """Continuously acquires and generate data at the same time.""" - total_read = 0 - phase = 0.0 - number_of_samples = 1000 - task_ai = nidaqmx.Task() - task_ao = nidaqmx.Task() - - def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data): - """Callback function for reading singals.""" - nonlocal total_read - read = task_ai.read(number_of_samples_per_channel=number_of_samples) - total_read += len(read) - print(f"Acquired data: {len(read)} samples. Total {total_read}.", end="\r") - - return 0 - - task_ai.ai_channels.add_ai_voltage_chan("Dev1/ai0") - task_ai.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) - task_ai.register_every_n_samples_acquired_into_buffer_event(1000, callback) - task_ao.ao_channels.add_ao_voltage_chan("Dev1/ao0") - task_ao.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) - task_ao.triggers.start_trigger.cfg_dig_edge_start_trig("/Dev1/ai/StartTrigger") - - actual_sampling_rate = task_ao.timing.samp_clk_rate - print(f"Actual sampling rate: {actual_sampling_rate:g} S/s") - - ao_data, phase = generate_sine_wave( - frequency=10.0, - amplitude=1.0, - sampling_rate=actual_sampling_rate, - phase_in=phase, - number_of_samples=number_of_samples, - ) - - try: - task_ao.write(ao_data) - task_ao.start() - task_ai.start() - - input("Acquiring samples continuously. Press Enter to stop.\n") - - except nidaqmx.DaqError as e: - print(e) - finally: - task_ai.stop() - task_ao.stop() - task_ai.close() - task_ao.close() - - print(f"\nAcquired {total_read} total samples.") - - -if __name__ == "__main__": - main() diff --git a/examples/multi-functions-synchronization/cont_ai_di_acq.py b/examples/multi-functions-synchronization/cont_ai_di_acq.py deleted file mode 100644 index 133561ba..00000000 --- a/examples/multi-functions-synchronization/cont_ai_di_acq.py +++ /dev/null @@ -1,57 +0,0 @@ -"""Example of analog and digital data acquisition at the same time. - -This example demonstrates how to continuously acquire analog and -digital data at the same time, synchronized with one another on -the same device. -""" - -import nidaqmx -from nidaqmx.constants import AcquisitionType, LineGrouping - - -def main(): - """Continuously acquire analog and digital data at the same time.""" - total_ai_read = 0 - total_di_read = 0 - task_ai = nidaqmx.Task() - task_di = nidaqmx.Task() - - def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data): - """Callback function for reading signals.""" - nonlocal total_ai_read - nonlocal total_di_read - ai_read = task_ai.read(number_of_samples_per_channel=1000) - di_read = task_di.read(number_of_samples_per_channel=1000) - total_ai_read += len(ai_read) - total_di_read += len(di_read) - print(f"\t{len(ai_read)}\t{len(di_read)}\t\t{total_ai_read}\t{total_di_read}", end="\r") - - return 0 - - task_ai.ai_channels.add_ai_voltage_chan("Dev1/ai0") - task_ai.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) - task_ai.register_every_n_samples_acquired_into_buffer_event(1000, callback) - task_di.di_channels.add_di_chan("Dev1/port0", line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) - task_di.timing.cfg_samp_clk_timing( - 1000.0, "/Dev1/ai/SampleClock", sample_mode=AcquisitionType.CONTINUOUS - ) - - try: - task_di.start() - task_ai.start() - - print("Acquiring samples continuously. Press Enter to stop.\n") - print("Read:\tAI\tDI\tTotal:\tAI\tDI") - input() - - except nidaqmx.DaqError as e: - print(e) - finally: - task_ai.stop() - task_di.stop() - task_ai.close() - task_di.close() - - -if __name__ == "__main__": - main() diff --git a/examples/synchronization/multi_function/ai_ao_sync.py b/examples/synchronization/multi_function/ai_ao_sync.py new file mode 100644 index 00000000..ab0da3a8 --- /dev/null +++ b/examples/synchronization/multi_function/ai_ao_sync.py @@ -0,0 +1,94 @@ +"""Example of analog input and output synchronization. + +This example demonstrates how to continuously acquire and +generate data at the same time, synchronized with one another. +""" + +from typing import Tuple + +import numpy as np +import numpy.typing + +import nidaqmx +from nidaqmx.constants import AcquisitionType + + +def generate_sine_wave( + frequency: float, + amplitude: float, + sampling_rate: float, + number_of_samples: int, + phase_in: float = 0.0, +) -> Tuple[numpy.typing.NDArray[numpy.double], float]: + """Generates a sine wave with a specified phase. + + Args: + frequency (float): Specifies the frequency of the sine wave. + amplitude (float): Specifies the amplitude of the sine wave. + sampling_rate (float): Specifies the sampling rate of the sine wave. + number_of_samples (int): Specifies the number of samples to generate. + phase_in (Optional[float]): Specifies the phase of the sine wave in radians. + + Returns: + Tuple[numpy.typing.NDArray[numpy.double], float]: Indicates a tuple + containing the generated data and the phase of the sine wave after generation. + """ + duration_time = number_of_samples / sampling_rate + duration_radians = duration_time * 2 * np.pi + phase_out = (phase_in + duration_radians) % (2 * np.pi) + t = np.linspace(phase_in, phase_in + duration_radians, number_of_samples, endpoint=False) + + return (amplitude * np.sin(frequency * t), phase_out) + + +def main(): + """Continuously acquires and generate data at the same time.""" + total_read = 0 + number_of_samples = 1000 + + with nidaqmx.Task() as ai_task, nidaqmx.Task() as ao_task: + + def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data): + """Callback function for reading signals.""" + nonlocal total_read + read = ai_task.read(number_of_samples_per_channel=number_of_samples) + total_read += len(read) + print(f"Acquired data: {len(read)} samples. Total {total_read}.", end="\r") + + return 0 + + ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0") + ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) + ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback) + ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0") + ao_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) + ao_task.triggers.start_trigger.cfg_dig_edge_start_trig("ai/StartTrigger") + + actual_sampling_rate = ao_task.timing.samp_clk_rate + print(f"Actual sampling rate: {actual_sampling_rate:g} S/s") + + ao_data, _ = generate_sine_wave( + frequency=10.0, + amplitude=1.0, + sampling_rate=actual_sampling_rate, + number_of_samples=number_of_samples, + ) + + try: + ao_task.write(ao_data) + ao_task.start() + ai_task.start() + + input("Acquiring samples continuously. Press Enter to stop.\n") + + except nidaqmx.DaqError as e: + print(e) + finally: + ai_task.stop() + ao_task.stop() + + print(f"\nAcquired {total_read} total samples.") + + +if __name__ == "__main__": + main() diff --git a/examples/synchronization/multi_function/cont_ai_di_acq.py b/examples/synchronization/multi_function/cont_ai_di_acq.py new file mode 100644 index 00000000..98f0cdd8 --- /dev/null +++ b/examples/synchronization/multi_function/cont_ai_di_acq.py @@ -0,0 +1,57 @@ +"""Example of analog and digital data acquisition at the same time. + +This example demonstrates how to continuously acquire analog and +digital data at the same time, synchronized with one another on +the same device. +""" + +import nidaqmx +from nidaqmx.constants import AcquisitionType, LineGrouping + + +def main(): + """Continuously acquire analog and digital data at the same time.""" + total_ai_read = 0 + total_di_read = 0 + + with nidaqmx.Task() as ai_task, nidaqmx.Task() as di_task: + + def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data): + """Callback function for reading signals.""" + nonlocal total_ai_read + nonlocal total_di_read + ai_read = ai_task.read(number_of_samples_per_channel=1000) + di_read = di_task.read(number_of_samples_per_channel=1000) + total_ai_read += len(ai_read) + total_di_read += len(di_read) + print(f"\t{len(ai_read)}\t{len(di_read)}\t\t{total_ai_read}\t{total_di_read}", end="\r") + + return 0 + + ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0") + ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) + ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback) + di_task.di_channels.add_di_chan("Dev1/port0", line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) + di_task.timing.cfg_samp_clk_timing( + 1000.0, "ai/SampleClock", sample_mode=AcquisitionType.CONTINUOUS + ) + + try: + di_task.start() + ai_task.start() + + print("Acquiring samples continuously. Press Enter to stop.\n") + print("Read:\tAI\tDI\tTotal:\tAI\tDI") + input() + + except nidaqmx.DaqError as e: + print(e) + finally: + ai_task.stop() + di_task.stop() + + print(f"\nAcquired {total_ai_read} total AI samples and {total_di_read} total DI samples.") + + +if __name__ == "__main__": + main()