From e5cf08fc47064e79bb67d0ba7530d535b0aa0aae Mon Sep 17 00:00:00 2001 From: G Adam Cox Date: Fri, 3 Feb 2023 11:22:01 -0800 Subject: [PATCH] Simplifies scanning object to generic CounterAndScanner Allows to be instantiated with an implementation of a RateCounterBase class. RandomRateCounter and NiDaqDigitalInputRateCounter are subclasses. Also, CounterAndScanner must be implemented with a stage_controller object that is a subclass of nipiezojenapy.BaseControl. (This is poor choice, as we should probably define an interface for a scanner and then an implementation that supports nipiezojenapy. This would allow for different scanners to be used in the future. See Issue #79) --- src/applications/piezoscan.py | 25 ++-- src/qt3utils/datagenerators/piezoscanner.py | 149 ++++++-------------- 2 files changed, 59 insertions(+), 115 deletions(-) diff --git a/src/applications/piezoscan.py b/src/applications/piezoscan.py index c6517b1b..837be3cc 100644 --- a/src/applications/piezoscan.py +++ b/src/applications/piezoscan.py @@ -1,17 +1,12 @@ -import time import argparse -import collections import tkinter as tk -import tkinter.ttk as ttk import logging from threading import Thread import numpy as np -import scipy.optimize import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk import matplotlib -matplotlib.use('Agg') import nidaqmx import qt3utils.nidaq @@ -20,6 +15,8 @@ import qt3utils.pulsers.pulseblaster import nipiezojenapy +matplotlib.use('Agg') + parser = argparse.ArgumentParser(description='NI DAQ (PCIx 6363) / Jena Piezo Scanner', formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -85,10 +82,10 @@ def __init__(self, mplcolormap = 'Reds'): def update(self, model): if self.log_data: - data = np.log10(model.data) + data = np.log10(model.scanned_count_rate) data[np.isinf(data)] = 0 #protect against +-inf else: - data = model.data + data = model.scanned_count_rate self.artist = self.ax.imshow(data, cmap=self.cmap, extent=[model.xmin, model.xmax + model.step_size, @@ -116,6 +113,7 @@ def onclick(self, event): #todo: draw a circle around clicked point? Maybe with a high alpha, so that its faint self.onclick_callback(event) + class SidePanel(): def __init__(self, root, scan_range): frame = tk.Frame(root) @@ -227,7 +225,6 @@ def __init__(self, root, scan_range): self.log10Button = tk.Button(frame, text="Log10") self.log10Button.grid(row=row, column=2, pady=(2,15)) - def update_go_to_position(self, x = None, y = None, z = None): if x is not None: self.go_to_x_position_text.set(np.round(x,4)) @@ -343,14 +340,14 @@ def go_to_z(self, event = None): def set_color_map(self, event = None): #Is there a way for this function to exist entirely in the view code instead of here? self.view.scan_view.cmap = self.view.sidepanel.mpl_color_map_entry.get() - if len(self.model.data) > 0: + if len(self.model.scanned_count_rate) > 0: self.view.scan_view.update(self.model) self.view.canvas.draw() def log_scan_image(self, event = None): #Is there a way for this function to exist entirely in the view code instead of here? self.view.scan_view.log_data = not self.view.scan_view.log_data - if len(self.model.data) > 0: + if len(self.model.scanned_count_rate) > 0: self.view.scan_view.update(self.model) self.view.canvas.draw() @@ -429,7 +426,7 @@ def save_scan(self, event = None): if afile is None or afile == '': return #selection was canceled. with open(afile, 'wb') as f_object: - np.save(f_object, self.model.data) + np.save(f_object, self.model.scanned_count_rate) self.view.sidepanel.saveScanButton['state'] = 'normal' @@ -499,7 +496,8 @@ def on_closing(self): def build_data_scanner(): if args.randomtest: stage_controller = nipiezojenapy.BaseControl() - scanner = datasources.RandomPiezoScanner(stage_controller=stage_controller) + data_acq = datasources.RandomRateCounter(simulate_single_light_source=True, + num_data_samples_per_batch=args.num_data_samples_per_batch) else: stage_controller = nipiezojenapy.PiezoControl(device_name = args.daq_name, write_channels = args.piezo_write_channels.split(','), @@ -513,7 +511,7 @@ def build_data_scanner(): args.rwtimeout, args.signal_counter) - scanner = datasources.NiDaqPiezoScanner(data_acq, stage_controller) + scanner = qt3utils.datagenerators.piezoscanner.CounterAndScanner(data_acq, stage_controller) return scanner @@ -522,5 +520,6 @@ def main(): tkapp = MainTkApplication(build_data_scanner()) tkapp.run() + if __name__ == '__main__': main() diff --git a/src/qt3utils/datagenerators/piezoscanner.py b/src/qt3utils/datagenerators/piezoscanner.py index 44acd101..7f19ac5c 100644 --- a/src/qt3utils/datagenerators/piezoscanner.py +++ b/src/qt3utils/datagenerators/piezoscanner.py @@ -1,4 +1,3 @@ -import abc import numpy as np import scipy.optimize import time @@ -10,29 +9,33 @@ def gauss(x, *p): C, mu, sigma, offset = p return C*np.exp(-(x-mu)**2/(2.*sigma**2)) + offset -class BasePiezoScanner(abc.ABC): - def __init__(self, stage_controller = None): - self.running = False +class CounterAndScanner: + def __init__(self, rate_counter, stage_controller): + self.running = False self.current_y = 0 self.ymin = 0.0 self.ymax = 80.0 self.xmin = 0.0 self.xmax = 80.0 self.step_size = 0.5 - self.raster_line_pause = 0.0 + self.raster_line_pause = 0.150 # wait 150ms for the piezo stage to settle before a line scan - self.data = [] - self.stage_controller = stage_controller + self.scanned_raw_counts = [] + self.scanned_count_rate = [] - self.num_daq_batches = 1 #could change to 10 if want 10x more samples for each position + self.stage_controller = stage_controller + self.rate_counter = rate_counter + self.num_daq_batches = 1 # could change to 10 if want 10x more samples for each position def stop(self): + self.rate_counter.stop() self.running = False def start(self): self.running = True + self.rate_counter.start() def set_to_starting_position(self): self.current_y = self.ymin @@ -40,7 +43,18 @@ def set_to_starting_position(self): self.stage_controller.go_to_position(x = self.xmin, y = self.ymin) def close(self): - return + self.rate_counter.close() + + def set_num_data_samples_per_batch(self, N): + self.rate_counter.num_data_samples_per_batch = N + + def sample_counts(self): + return self.rate_counter.sample_counts(self.num_daq_batches) + + def sample_count_rate(self, data_counts=None): + if data_counts is None: + data_counts = self.sample_counts() + return self.rate_counter.sample_count_rate(data_counts) def set_scan_range(self, xmin, xmax, ymin, ymax): if self.stage_controller: @@ -70,47 +84,40 @@ def move_y(self): except ValueError as e: logger.info(f'out of range\n\n{e}') - @abc.abstractmethod - def sample_counts(self): - """ - expectation is to return [[counts, clock_samples], [counts, clock_samples], ...] as is returned by daqsamplers.sample_counts. + def scan_x(self): """ - pass + Scans the x axis from xmin to xmax in steps of step_size. - @abc.abstractmethod - def sample_count_rate(self): + Stores results in self.scanned_raw_counts and self.scanned_count_rate. """ - must return a single floating point value - """ - pass - - @abc.abstractmethod - def set_num_data_samples_per_batch(self, N): - pass - - def scan_x(self): - - scan = self.scan_axis('x', self.xmin, self.xmax, self.step_size) - self.data.append(scan) + raw_counts_for_axis = self.scan_axis('x', self.xmin, self.xmax, self.step_size) + self.scanned_raw_counts.append(raw_counts_for_axis) + self.scanned_count_rate.append([self.sample_count_rate(raw_counts) for raw_counts in raw_counts_for_axis]) def scan_axis(self, axis, min, max, step_size): - scan = [] + """ + Moves the stage along the specified axis from min to max in steps of step_size. + Returns a list of raw counts from the scan in the shape + [[[counts, clock_samples]], [[counts, clock_samples]], ...] where each [[counts, clock_samples]] is the + result of a single call to sample_counts at each scan position along the axis. + """ + raw_counts = [] self.stage_controller.go_to_position(**{axis:min}) time.sleep(self.raster_line_pause) for val in np.arange(min, max, step_size): if self.stage_controller: logger.info(f'go to position {axis}: {val:.2f}') self.stage_controller.go_to_position(**{axis:val}) - cr = self.sample_count_rate() - scan.append(cr) - logger.info(f'count rate: {cr}') + _raw_counts = self.sample_counts() + raw_counts.append(_raw_counts) + logger.info(f'raw counts, total clock samples: {_raw_counts}') if self.stage_controller: logger.info(f'current position: {self.stage_controller.get_current_position()}') - return scan + return raw_counts def reset(self): - self.data = [] + self.scanned_raw_counts = [] def optimize_position(self, axis, center_position, width = 2, step_size = 0.25): ''' @@ -147,81 +154,19 @@ def optimize_position(self, axis, center_position, width = 2, step_size = 0.25): max_val = np.min([max_val, 80.0]) self.start() - data = self.scan_axis(axis, min_val, max_val, step_size) + raw_counts = self.scan_axis(axis, min_val, max_val, step_size) self.stop() axis_vals = np.arange(min_val, max_val, step_size) + count_rate = self.sample_count_rate(raw_counts) - optimal_position = axis_vals[np.argmax(data)] + optimal_position = axis_vals[np.argmax(count_rate)] coeff = None - params = [np.max(data), optimal_position, 1.0, np.min(data)] + params = [np.max(count_rate), optimal_position, 1.0, np.min(count_rate)] try: - coeff, var_matrix = scipy.optimize.curve_fit(gauss, axis_vals, data, p0=params) + coeff, var_matrix = scipy.optimize.curve_fit(gauss, axis_vals, count_rate, p0=params) optimal_position = coeff[1] except RuntimeError as e: print(e) - return data, axis_vals, optimal_position, coeff - -class NiDaqPiezoScanner(BasePiezoScanner): - def __init__(self, nidaqratecounter, stage_controller, num_data_samples_per_batch = 50): - super().__init__(stage_controller) - self.nidaqratecounter = nidaqratecounter - self.raster_line_pause = 0.150 #wait 150ms for the piezo stage to settle before a line scan - self.set_num_data_samples_per_batch(num_data_samples_per_batch) + return count_rate, axis_vals, optimal_position, coeff - def set_num_data_samples_per_batch(self, N): - self.nidaqratecounter.num_data_samples_per_batch = N - - def sample_counts(self): - return self.nidaqratecounter.sample_counts(self.num_daq_batches) - - def sample_count_rate(self, data_counts=None): - if data_counts is None: - data_counts = self.sample_counts() - return self.nidaqratecounter.sample_count_rate(data_counts) - - def stop(self): - self.nidaqratecounter.stop() - super().stop() - - def start(self): - super().start() - self.nidaqratecounter.start() - - def close(self): - super().close() - self.nidaqratecounter.close() - -class RandomPiezoScanner(BasePiezoScanner): - ''' - This random scanner acts like it finds bright light sources - at random positions across a scan. - ''' - def __init__(self, stage_controller = None): - super().__init__(stage_controller) - self.default_offset = 350 - self.signal_noise_amp = 0.2 - self.possible_offset_values = np.arange(5000, 100000, 1000) # these create the "bright" positions - - self.current_offset = self.default_offset - self.clock_period = 0.09302010 # a totally random number - - def set_num_data_samples_per_batch(self, N): - #for the random sampler, there is only one sample per batch. So, we set - #number of batches here - self.num_daq_batches = N - - - def sample_counts(self): - if np.random.random(1)[0] < 0.005: - self.current_offset = np.random.choice(self.possible_offset_values) - else: - self.current_offset = self.default_offset - - return self.signal_noise_amp * self.current_offset * np.random.random( - self.num_daq_batches) + self.current_offset - - def sample_count_rate(self, data_counts = None): - if data_counts is None: - data_counts = self.sample_counts() - return np.sum(data_counts) / self.clock_period