Skip to content

Commit

Permalink
Simplifies scanning object to generic CounterAndScanner
Browse files Browse the repository at this point in the history
Allows to be instantiated with an implementation of a RateCounterBase
class. RandomRateCounter and NiDaqDigitalInputRateCounter are subclasses.

Also, CounterAndScanner must be implemented with a stage_controller object that
is a subclass of nipiezojenapy.BaseControl. (This is poor choice,
as we should probably define an interface for a scanner and then
an implementation that supports nipiezojenapy. This would allow for
different scanners to be used in the future. See Issue #79)
  • Loading branch information
gadamc committed Feb 7, 2023
1 parent f5fb7a6 commit 8b7ee45
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 115 deletions.
25 changes: 12 additions & 13 deletions src/applications/piezoscan.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import time
import argparse
import collections
import tkinter as tk
import tkinter.ttk as ttk
import logging
import datetime
from threading import Thread

import numpy as np
import scipy.optimize
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
import matplotlib
matplotlib.use('Agg')
import nidaqmx

import qt3utils.nidaq
Expand All @@ -21,6 +16,8 @@
import qt3utils.pulsers.pulseblaster
import nipiezojenapy

matplotlib.use('Agg')


parser = argparse.ArgumentParser(description='NI DAQ (PCIx 6363) / Jena Piezo Scanner',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
Expand Down Expand Up @@ -86,10 +83,10 @@ def __init__(self, mplcolormap = 'Reds'):
def update(self, model):

if self.log_data:
data = np.log10(model.data)
data = np.log10(model.scanned_count_rate)
data[np.isinf(data)] = 0 #protect against +-inf
else:
data = model.data
data = model.scanned_count_rate

self.artist = self.ax.imshow(data, cmap=self.cmap, extent=[model.xmin,
model.xmax + model.step_size,
Expand Down Expand Up @@ -117,6 +114,7 @@ def onclick(self, event):
#todo: draw a circle around clicked point? Maybe with a high alpha, so that its faint
self.onclick_callback(event)


class SidePanel():
def __init__(self, root, scan_range):
frame = tk.Frame(root)
Expand Down Expand Up @@ -231,7 +229,6 @@ def __init__(self, root, scan_range):
self.log10Button = tk.Button(frame, text="Log10")
self.log10Button.grid(row=row, column=2, pady=(2,15))


def update_go_to_position(self, x = None, y = None, z = None):
if x is not None:
self.go_to_x_position_text.set(np.round(x,4))
Expand Down Expand Up @@ -349,14 +346,14 @@ def go_to_z(self, event = None):
def set_color_map(self, event = None):
#Is there a way for this function to exist entirely in the view code instead of here?
self.view.scan_view.cmap = self.view.sidepanel.mpl_color_map_entry.get()
if len(self.model.data) > 0:
if len(self.model.scanned_count_rate) > 0:
self.view.scan_view.update(self.model)
self.view.canvas.draw()

def log_scan_image(self, event = None):
#Is there a way for this function to exist entirely in the view code instead of here?
self.view.scan_view.log_data = not self.view.scan_view.log_data
if len(self.model.data) > 0:
if len(self.model.scanned_count_rate) > 0:
self.view.scan_view.update(self.model)
self.view.canvas.draw()

Expand Down Expand Up @@ -459,7 +456,7 @@ def save_scan(self, event = None):
if afile is None or afile == '':
return #selection was canceled.
with open(afile, 'wb') as f_object:
np.save(f_object, self.model.data)
np.save(f_object, self.model.scanned_count_rate)

self.view.sidepanel.saveScanButton['state'] = 'normal'

Expand Down Expand Up @@ -531,7 +528,8 @@ def on_closing(self):
def build_data_scanner():
if args.randomtest:
stage_controller = nipiezojenapy.BaseControl()
scanner = datasources.RandomPiezoScanner(stage_controller=stage_controller)
data_acq = datasources.RandomRateCounter(simulate_single_light_source=True,
num_data_samples_per_batch=args.num_data_samples_per_batch)
else:
stage_controller = nipiezojenapy.PiezoControl(device_name = args.daq_name,
write_channels = args.piezo_write_channels.split(','),
Expand All @@ -545,7 +543,7 @@ def build_data_scanner():
args.rwtimeout,
args.signal_counter)

scanner = datasources.NiDaqPiezoScanner(data_acq, stage_controller)
scanner = qt3utils.datagenerators.piezoscanner.CounterAndScanner(data_acq, stage_controller)

return scanner

Expand All @@ -554,5 +552,6 @@ def main():
tkapp = MainTkApplication(build_data_scanner())
tkapp.run()


if __name__ == '__main__':
main()
149 changes: 47 additions & 102 deletions src/qt3utils/datagenerators/piezoscanner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import abc
import numpy as np
import scipy.optimize
import time
Expand All @@ -10,37 +9,52 @@ def gauss(x, *p):
C, mu, sigma, offset = p
return C*np.exp(-(x-mu)**2/(2.*sigma**2)) + offset

class BasePiezoScanner(abc.ABC):
def __init__(self, stage_controller = None):

self.running = False
class CounterAndScanner:
def __init__(self, rate_counter, stage_controller):

self.running = False
self.current_y = 0
self.ymin = 0.0
self.ymax = 80.0
self.xmin = 0.0
self.xmax = 80.0
self.step_size = 0.5
self.raster_line_pause = 0.0
self.raster_line_pause = 0.150 # wait 150ms for the piezo stage to settle before a line scan

self.data = []
self.stage_controller = stage_controller
self.scanned_raw_counts = []
self.scanned_count_rate = []

self.num_daq_batches = 1 #could change to 10 if want 10x more samples for each position
self.stage_controller = stage_controller
self.rate_counter = rate_counter
self.num_daq_batches = 1 # could change to 10 if want 10x more samples for each position

def stop(self):
self.rate_counter.stop()
self.running = False

def start(self):
self.running = True
self.rate_counter.start()

def set_to_starting_position(self):
self.current_y = self.ymin
if self.stage_controller:
self.stage_controller.go_to_position(x = self.xmin, y = self.ymin)

def close(self):
return
self.rate_counter.close()

def set_num_data_samples_per_batch(self, N):
self.rate_counter.num_data_samples_per_batch = N

def sample_counts(self):
return self.rate_counter.sample_counts(self.num_daq_batches)

def sample_count_rate(self, data_counts=None):
if data_counts is None:
data_counts = self.sample_counts()
return self.rate_counter.sample_count_rate(data_counts)

def set_scan_range(self, xmin, xmax, ymin, ymax):
if self.stage_controller:
Expand Down Expand Up @@ -70,47 +84,40 @@ def move_y(self):
except ValueError as e:
logger.info(f'out of range\n\n{e}')

@abc.abstractmethod
def sample_counts(self):
"""
expectation is to return [[counts, clock_samples], [counts, clock_samples], ...] as is returned by daqsamplers.sample_counts.
def scan_x(self):
"""
pass
Scans the x axis from xmin to xmax in steps of step_size.
@abc.abstractmethod
def sample_count_rate(self):
Stores results in self.scanned_raw_counts and self.scanned_count_rate.
"""
must return a single floating point value
"""
pass

@abc.abstractmethod
def set_num_data_samples_per_batch(self, N):
pass

def scan_x(self):

scan = self.scan_axis('x', self.xmin, self.xmax, self.step_size)
self.data.append(scan)
raw_counts_for_axis = self.scan_axis('x', self.xmin, self.xmax, self.step_size)
self.scanned_raw_counts.append(raw_counts_for_axis)
self.scanned_count_rate.append([self.sample_count_rate(raw_counts) for raw_counts in raw_counts_for_axis])

def scan_axis(self, axis, min, max, step_size):
scan = []
"""
Moves the stage along the specified axis from min to max in steps of step_size.
Returns a list of raw counts from the scan in the shape
[[[counts, clock_samples]], [[counts, clock_samples]], ...] where each [[counts, clock_samples]] is the
result of a single call to sample_counts at each scan position along the axis.
"""
raw_counts = []
self.stage_controller.go_to_position(**{axis:min})
time.sleep(self.raster_line_pause)
for val in np.arange(min, max, step_size):
if self.stage_controller:
logger.info(f'go to position {axis}: {val:.2f}')
self.stage_controller.go_to_position(**{axis:val})
cr = self.sample_count_rate()
scan.append(cr)
logger.info(f'count rate: {cr}')
_raw_counts = self.sample_counts()
raw_counts.append(_raw_counts)
logger.info(f'raw counts, total clock samples: {_raw_counts}')
if self.stage_controller:
logger.info(f'current position: {self.stage_controller.get_current_position()}')

return scan
return raw_counts

def reset(self):
self.data = []
self.scanned_raw_counts = []

def optimize_position(self, axis, center_position, width = 2, step_size = 0.25):
'''
Expand Down Expand Up @@ -147,81 +154,19 @@ def optimize_position(self, axis, center_position, width = 2, step_size = 0.25):
max_val = np.min([max_val, 80.0])

self.start()
data = self.scan_axis(axis, min_val, max_val, step_size)
raw_counts = self.scan_axis(axis, min_val, max_val, step_size)
self.stop()
axis_vals = np.arange(min_val, max_val, step_size)
count_rate = self.sample_count_rate(raw_counts)

optimal_position = axis_vals[np.argmax(data)]
optimal_position = axis_vals[np.argmax(count_rate)]
coeff = None
params = [np.max(data), optimal_position, 1.0, np.min(data)]
params = [np.max(count_rate), optimal_position, 1.0, np.min(count_rate)]
try:
coeff, var_matrix = scipy.optimize.curve_fit(gauss, axis_vals, data, p0=params)
coeff, var_matrix = scipy.optimize.curve_fit(gauss, axis_vals, count_rate, p0=params)
optimal_position = coeff[1]
except RuntimeError as e:
print(e)

return data, axis_vals, optimal_position, coeff

class NiDaqPiezoScanner(BasePiezoScanner):
def __init__(self, nidaqratecounter, stage_controller, num_data_samples_per_batch = 50):
super().__init__(stage_controller)
self.nidaqratecounter = nidaqratecounter
self.raster_line_pause = 0.150 #wait 150ms for the piezo stage to settle before a line scan
self.set_num_data_samples_per_batch(num_data_samples_per_batch)
return count_rate, axis_vals, optimal_position, coeff

def set_num_data_samples_per_batch(self, N):
self.nidaqratecounter.num_data_samples_per_batch = N

def sample_counts(self):
return self.nidaqratecounter.sample_counts(self.num_daq_batches)

def sample_count_rate(self, data_counts=None):
if data_counts is None:
data_counts = self.sample_counts()
return self.nidaqratecounter.sample_count_rate(data_counts)

def stop(self):
self.nidaqratecounter.stop()
super().stop()

def start(self):
super().start()
self.nidaqratecounter.start()

def close(self):
super().close()
self.nidaqratecounter.close()

class RandomPiezoScanner(BasePiezoScanner):
'''
This random scanner acts like it finds bright light sources
at random positions across a scan.
'''
def __init__(self, stage_controller = None):
super().__init__(stage_controller)
self.default_offset = 350
self.signal_noise_amp = 0.2
self.possible_offset_values = np.arange(5000, 100000, 1000) # these create the "bright" positions

self.current_offset = self.default_offset
self.clock_period = 0.09302010 # a totally random number

def set_num_data_samples_per_batch(self, N):
#for the random sampler, there is only one sample per batch. So, we set
#number of batches here
self.num_daq_batches = N


def sample_counts(self):
if np.random.random(1)[0] < 0.005:
self.current_offset = np.random.choice(self.possible_offset_values)
else:
self.current_offset = self.default_offset

return self.signal_noise_amp * self.current_offset * np.random.random(
self.num_daq_batches) + self.current_offset

def sample_count_rate(self, data_counts = None):
if data_counts is None:
data_counts = self.sample_counts()
return np.sum(data_counts) / self.clock_period

0 comments on commit 8b7ee45

Please sign in to comment.