Skip to content

Commit

Permalink
Puts random sampler and nidaq sampler on equal footing.
Browse files Browse the repository at this point in the history
Moves common methods to base class. Only a _read_samples method
and clock_rate should be implemeneted by subclasses.

removes setting .running attribute in subclasses
  • Loading branch information
gadamc committed Feb 3, 2023
1 parent 08551d4 commit d4c7658
Showing 1 changed file with 106 additions and 91 deletions.
197 changes: 106 additions & 91 deletions src/qt3utils/datagenerators/daqsamplers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
logger = logging.getLogger(__name__)

class RateCounterBase(abc.ABC):

"""
Subclasses must implement a clock_rate attribute or property.
"""
def __init__(self):
self.clock_rate = 1 # default clock rate
self.running = False

self.clock_rate = 0
def stop(self):
"""
subclasses may override this for custom behavior
Expand All @@ -34,27 +35,92 @@ def close(self):
pass

@abc.abstractmethod
def sample_counts(self, n_samples = 1) -> np.ndarray:
def _read_samples(self):
"""
Should return a numpy array of size n_samples, with each row being
an array (or tuple) of two values, The first value is equal to the number of counts,
and the second value is the number of clock samples that were used to measure the counts.
subclasses must implement this method
Should return total_counts, num_clock_samples
"""
pass

def sample_counts(self, n_batches=1, sum_counts=True):
"""
Performs n_batches of batch reads from _read_samples method.
This is useful when hardware (such as NIDAQ) is pre-configured to acquire a fixed number of samples
and the caller wishes to read more data than the number of samples acquired.
For example, if the NiDAQ is configured to acquire 1000 clock samples, but the caller
wishes to read 10000 samples, then this function may be called with n_batches=10.
For each batch read (of size `num_data_samples_per_batch`), the
total counts are summed. Because it's possible (though unlikely)
for the hardware to return fewer than `num_data_samples_per_batch` measurements,
the actual number of data samples per batch are also recorded.
If sum_counts is False, a numpy array of shape (n_batches, 2) is returned, where
the first element is the sum of the counts, and the second element is
the actual number of clock samples per batch. This may be useful for the caller if
they wish to perform their own averaging or other statistical analysis that may be time dependent.
For example, if `num_data_samples_per_batch` is 5 and n_batches is 3,
(typical values are 100 and 10, 100 and 1, 1000 and 1, etc)
Example, if n_samples = 3
reading counts from the NiDAQ may return
#sample 1
raw_counts_1 = [3,5,4,6,4]
sum_counts_1 = 22
size_counts_1 = 5
(22, 5)
#sample 2
raw_counts_2 = [5,5,7,3,4]
sum_counts_2 = 24
size_counts_2 = 5
(24, 5)
#sample 3
raw_counts_3 = [5,3,5,7]
sum_counts_3 = 20
size_counts_2 = 4
(20, 4)
In this example, the numpy array is of shape (3, 2) and will be
data = [
[22, 5], # 22 counts were observed in 5 clock samples
[22, 5],
[24, 5],
[20, 4] # this data indicates there was an error with data acquisition - 4 clock samples were observed.
[20, 4]
]
If sum_counts is True, then will the total number of counts and total number of
clock samples read will be returned.
np.sum(data, axis=0, keepdims=True).
In the example above, this would be [[66, 14]].
With these data, and knowing the clock_rate, one can easily compute
the count rate. See sample_count_rate.
"""
pass

data = np.zeros((n_batches, 2))
for i in range(n_batches):
data_sample, samples_read = self._read_samples()
if samples_read > 0:
data[i][0] = np.sum(data_sample[:samples_read])
data[i][1] = samples_read
logger.info(f'batch data (sum counts, num clock cycles per batch): {data[i]}')

if sum_counts:
return np.sum(data, axis=0, keepdims=True)
else:
return data

def sample_count_rate(self, data_counts: np.ndarray):
"""
Converts the output of sample_counts to a count rate. Expects data_counts to be a 2d numpy array
of [[counts, clock_samples], [counts, clock_samples], ...] as is returned by sample_counts.
of [[counts, clock_samples], [counts, clock_samples], ...] or a 2d array with one row: [[counts, clock_samples]]
as is returned by sample_counts.
Under normal conditions, will return a single value
Returns the count rate in counts/second = clock_rate * total counts/ total clock_samples)
If the sum of all clock_samples is 0, will return np.nan.
"""
Expand All @@ -64,7 +130,6 @@ def sample_count_rate(self, data_counts: np.ndarray):
else:
return np.nan


def yield_count_rate(self):
while self.running:
count_data = self.sample_counts()
Expand All @@ -78,32 +143,41 @@ class RandomRateCounter(RateCounterBase):
This is similar to a PL source moving in and out of focus.
'''
def __init__(self):
def __init__(self, simulate_single_light_source=False, num_data_samples_per_batch=10):
super().__init__()
self.default_offset = 100
self.signal_noise_amp = 0.2
self.possible_offset_values = np.arange(0, 1000, 50)
self.signal_noise_amp = 0.2

self.current_offset = self.default_offset
self.current_direction = 1
self.running = False
self.clock_rate = 0.9302010 # a totally random number :P
self.simulate_single_light_source = simulate_single_light_source
self.possible_offset_values = np.arange(5000, 100000, 1000) # these create the "bright" positions
self.num_data_samples_per_batch = num_data_samples_per_batch

def sample_counts(self, n_samples = 1):
def _read_samples(self):
"""
Returns a random number of counts
"""
if np.random.random(1)[0] < 0.05:
if np.random.random(1)[0] < 0.1:
self.current_direction = -1 * self.current_direction
self.current_offset += self.current_direction*np.random.choice(self.possible_offset_values)
if self.simulate_single_light_source:
if np.random.random(1)[0] < 0.005:
self.current_offset = np.random.choice(self.possible_offset_values)
else:
self.current_offset = self.default_offset

else:
if np.random.random(1)[0] < 0.05:
if np.random.random(1)[0] < 0.1:
self.current_direction = -1 * self.current_direction
self.current_offset += self.current_direction*np.random.choice(self.possible_offset_values)

if self.current_offset < self.default_offset:
self.current_offset = self.default_offset
self.current_direction = 1
if self.current_offset < self.default_offset:
self.current_offset = self.default_offset
self.current_direction = 1

counts = self.signal_noise_amp*self.current_offset*np.random.random(n_samples) + self.current_offset
count_size = np.ones(n_samples)
return np.column_stack((counts, count_size))
counts = self.signal_noise_amp * self.current_offset * np.random.random(self.num_data_samples_per_batch) + self.current_offset

return counts, self.num_data_samples_per_batch


class NiDaqDigitalInputRateCounter(RateCounterBase):
Expand All @@ -126,7 +200,6 @@ def __init__(self, daq_name = 'Dev1',
self.read_write_timeout = read_write_timeout
self.num_data_samples_per_batch = num_data_samples_per_batch
self.trigger_terminal = trigger_terminal
self.running = False

self.read_lock = False

Expand Down Expand Up @@ -188,7 +261,6 @@ def _read_samples(self):
self.read_lock = False
return data_buffer, samples_read


def start(self):
if self.running:
self.stop()
Expand All @@ -208,76 +280,19 @@ def _burn_and_log_exception(self, f):
def stop(self):
if self.running:
while self.read_lock:
time.sleep(0.1) #wait for current read to complete
time.sleep(0.1) # wait for current read to complete

if self.nidaq_config.clock_task:
self._burn_and_log_exception(self.nidaq_config.clock_task.stop)
self._burn_and_log_exception(self.nidaq_config.clock_task.close) #close the task to free resource on NIDAQ
#self._burn_and_log_exception(self.nidaq_config.counter_task.stop) #will need to stop task if we move to continuous buffered acquisition
self._burn_and_log_exception(self.nidaq_config.clock_task.close) # close the task to free resource on NIDAQ
# self._burn_and_log_exception(self.nidaq_config.counter_task.stop) # will need to stop task if we move to continuous buffered acquisition
self._burn_and_log_exception(self.nidaq_config.counter_task.close)

self.running = False

def close(self):
self.stop()

def sample_counts(self, n_samples = 1):
'''
Performs n_samples of batch reads from the NiDAQ.
For each batch read (of size `num_data_samples_per_batch`), the
total counts are summed. Additionally, because it's possible (though unlikely)
for the NiDAQ to return fewer than `num_data_samples_per_batch` measurements,
the actual number of data samples per batch are also recorded.
Finally, a numpy array of shape (n_samples, 2) is returned, where
the first element is the sum of the counts, and the second element is
the actual number of data samples per batch.
For example, if `num_data_samples_per_batch` is 5 and n_samples is 3,
(typical values are 100 and 10, 100 and 1, 1000 and 1, etc)
reading counts from the NiDAQ may return
#sample 1
raw_counts_1 = [3,5,4,6,4]
sum_counts_1 = 22
size_counts_1 = 5
(22, 5)
#sample 2
raw_counts_2 = [5,5,7,3,4]
sum_counts_2 = 24
size_counts_2 = 5
(24, 5)
#sample 3
raw_counts_3 = [5,3,5,7]
sum_counts_3 = 20
size_counts_2 = 4
(20, 4)
In this example, the numpy array is of shape (3, 2) and will be
data = [
[22, 5],
[24, 5],
[20, 4]
]
With these data, and knowing the clock_rate, one can easily compute
the count rate

#removes rows where num samples per batch were zero (which would be a bug in the code)
data = data[np.where(data[:,1] > 0)]

#count rate is the mean counts per clock cycle multiplied by the clock rate.
count_rate = clock_rate * data[:,0]/data[:,1]
'''

data = np.zeros((n_samples, 2))
for i in range(n_samples):
data_sample, samples_read = self._read_samples()
if samples_read > 0:
data[i][0] = np.sum(data_sample[:samples_read])
data[i][1] = samples_read
logger.info(f'batch data (sum counts, num clock cycles per batch): {data[i]}')
return data

0 comments on commit d4c7658

Please sign in to comment.