diff --git a/examples/synchronization/multi_function/ai_ao_sync.py b/examples/synchronization/multi_function/ai_ao_sync.py index ab0da3a8..3b6592e8 100644 --- a/examples/synchronization/multi_function/ai_ao_sync.py +++ b/examples/synchronization/multi_function/ai_ao_sync.py @@ -10,7 +10,27 @@ import numpy.typing import nidaqmx -from nidaqmx.constants import AcquisitionType +from nidaqmx.constants import AcquisitionType, ProductCategory + + +def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str: + """Gets the terminal name with the device prefix. + + Args: + task: Specifies the task to get the device name from. + terminal_name: Specifies the terminal name to get. + + Returns: + Indicates the terminal name with the device prefix. + """ + for device in task.devices: + if device.product_category not in [ + ProductCategory.C_SERIES_MODULE, + ProductCategory.SCXI_MODULE, + ]: + return f"/{device.name}/{terminal_name}" + + return "" def generate_sine_wave( @@ -23,15 +43,15 @@ def generate_sine_wave( """Generates a sine wave with a specified phase. Args: - frequency (float): Specifies the frequency of the sine wave. - amplitude (float): Specifies the amplitude of the sine wave. - sampling_rate (float): Specifies the sampling rate of the sine wave. - number_of_samples (int): Specifies the number of samples to generate. - phase_in (Optional[float]): Specifies the phase of the sine wave in radians. + frequency: Specifies the frequency of the sine wave. + amplitude: Specifies the amplitude of the sine wave. + sampling_rate: Specifies the sampling rate of the sine wave. + number_of_samples: Specifies the number of samples to generate. + phase_in: Specifies the phase of the sine wave in radians. Returns: - Tuple[numpy.typing.NDArray[numpy.double], float]: Indicates a tuple - containing the generated data and the phase of the sine wave after generation. + Indicates a tuple containing the generated data and the phase + of the sine wave after generation. """ duration_time = number_of_samples / sampling_rate duration_radians = duration_time * 2 * np.pi @@ -60,9 +80,11 @@ def callback(task_handle, every_n_samples_event_type, number_of_samples, callbac ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0") ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback) + terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/StartTrigger") + ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0") ao_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) - ao_task.triggers.start_trigger.cfg_dig_edge_start_trig("ai/StartTrigger") + ao_task.triggers.start_trigger.cfg_dig_edge_start_trig(terminal_name) actual_sampling_rate = ao_task.timing.samp_clk_rate print(f"Actual sampling rate: {actual_sampling_rate:g} S/s") @@ -74,18 +96,14 @@ def callback(task_handle, every_n_samples_event_type, number_of_samples, callbac number_of_samples=number_of_samples, ) - try: - ao_task.write(ao_data) - ao_task.start() - ai_task.start() + ao_task.write(ao_data) + ao_task.start() + ai_task.start() - input("Acquiring samples continuously. Press Enter to stop.\n") + input("Acquiring samples continuously. Press Enter to stop.\n") - except nidaqmx.DaqError as e: - print(e) - finally: - ai_task.stop() - ao_task.stop() + ai_task.stop() + ao_task.stop() print(f"\nAcquired {total_read} total samples.") diff --git a/examples/synchronization/multi_function/cont_ai_di_acq.py b/examples/synchronization/multi_function/cont_ai_di_acq.py index 98f0cdd8..d2ecfaf5 100644 --- a/examples/synchronization/multi_function/cont_ai_di_acq.py +++ b/examples/synchronization/multi_function/cont_ai_di_acq.py @@ -6,7 +6,27 @@ """ import nidaqmx -from nidaqmx.constants import AcquisitionType, LineGrouping +from nidaqmx.constants import AcquisitionType, LineGrouping, ProductCategory + + +def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str: + """Gets the terminal name with the device prefix. + + Args: + task: Specifies the task to get the device name from. + terminal_name: Specifies the terminal name to get. + + Returns: + Indicates the terminal name with the device prefix. + """ + for device in task.devices: + if device.product_category not in [ + ProductCategory.C_SERIES_MODULE, + ProductCategory.SCXI_MODULE, + ]: + return f"/{device.name}/{terminal_name}" + + return "" def main(): @@ -20,8 +40,8 @@ def callback(task_handle, every_n_samples_event_type, number_of_samples, callbac """Callback function for reading signals.""" nonlocal total_ai_read nonlocal total_di_read - ai_read = ai_task.read(number_of_samples_per_channel=1000) - di_read = di_task.read(number_of_samples_per_channel=1000) + ai_read = ai_task.read(number_of_samples_per_channel=number_of_samples) + di_read = di_task.read(number_of_samples_per_channel=number_of_samples) total_ai_read += len(ai_read) total_di_read += len(di_read) print(f"\t{len(ai_read)}\t{len(di_read)}\t\t{total_ai_read}\t{total_di_read}", end="\r") @@ -31,24 +51,22 @@ def callback(task_handle, every_n_samples_event_type, number_of_samples, callbac ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0") ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback) + terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/SampleClock") + di_task.di_channels.add_di_chan("Dev1/port0", line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) di_task.timing.cfg_samp_clk_timing( - 1000.0, "ai/SampleClock", sample_mode=AcquisitionType.CONTINUOUS + 1000.0, terminal_name, sample_mode=AcquisitionType.CONTINUOUS ) - try: - di_task.start() - ai_task.start() + di_task.start() + ai_task.start() - print("Acquiring samples continuously. Press Enter to stop.\n") - print("Read:\tAI\tDI\tTotal:\tAI\tDI") - input() + print("Acquiring samples continuously. Press Enter to stop.\n") + print("Read:\tAI\tDI\tTotal:\tAI\tDI") + input() - except nidaqmx.DaqError as e: - print(e) - finally: - ai_task.stop() - di_task.stop() + ai_task.stop() + di_task.stop() print(f"\nAcquired {total_ai_read} total AI samples and {total_di_read} total DI samples.")