Skip to content

Commit

Permalink
[client/py] - added multi rate calculated channel example
Browse files Browse the repository at this point in the history
  • Loading branch information
emilbon99 committed Sep 23, 2024
1 parent 6e64abb commit 37e0fd9
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 4 deletions.
112 changes: 112 additions & 0 deletions client/py/examples/multi-rate-calculated/calculated_interpolation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright 2024 Synnax Labs, Inc.
#
# Use of this software is governed by the Business Source License included in the file
# licenses/BSL.txt.
#
# As of the Change Date specified in that file, in accordance with the Business Source
# License, use of this software will be governed by the Apache License, Version 2.0,
# included in the file licenses/APL.txt.

import synnax as sy

"""
This example demonstrates how to calculate the average of two sensor channels that
are being sampled at different rates, using numpy's interpolation function to correctly
align the timestamps of the two channels. This example is more complex than the
'calculate_simple.py' example, and requires more computational resources to run.
This example must be run in conjunction with the 'simulated_daq.py' file in this
directory. Run the 'simulated_daq.py' file first, and then run this file.
"""

# We've logged in via the CLI, so there's no need to provide credentials here.
# See https://docs.synnaxlabs.com/reference/python-client/get-started for more information.
client = sy.Synnax()

# We create a separate index channel to store the timestamps for the calculated values.
# These will store the same timestamps as the raw time channel, but will be used to
# index the calculated values.
derived_time_ch = client.channels.create(
name="derived_time",
is_index=True,
retrieve_if_name_exists=True
)

# We'll store the average of "stream_write_example_data_1" and "stream_write_example_data_2"
# in this channel.
average_example_data_1 = client.channels.create(
name="average_example_data_1",
index=derived_time_ch.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True
)

current_values = dict()

TO_READ = ["time_ch_1", "time_ch_2", "data_ch_1", "data_ch_2"]

import numpy as np


def interpolate(data_ch_1_time, data_ch_1, data_ch_2_time, data_ch_2):
# Start off by converting the data to numpy arrays
data_ch_1_time = np.array(data_ch_1_time)
data_ch_1 = np.array(data_ch_1)
data_ch_2_time = np.array(data_ch_2_time)
data_ch_2 = np.array(data_ch_2)

# Check whether any of the timestamps overlap. If not, we can't interpolate.
start_time = max(data_ch_1_time[0], data_ch_2_time[0])
end_time = min(data_ch_1_time[-1], data_ch_2_time[-1])
if start_time > end_time:
return np.array([]), np.array([]), np.array([])

combined_timestamps = np.unique(
np.concatenate((data_ch_1_time, data_ch_2_time)))
# We only want to interpolate values that are within the range of both channels.
avg_timestamps = combined_timestamps[
(combined_timestamps >= start_time) & (combined_timestamps <= end_time)
]
# Interpolate the values for each channel
sensor1_values_interp = np.interp(
avg_timestamps,
data_ch_1_time,
data_ch_1
)
sensor2_values_interp = np.interp(
avg_timestamps,
data_ch_2_time,
data_ch_2
)
# Return the interpolated values and the timestamps
return sensor1_values_interp, sensor2_values_interp, avg_timestamps


with client.open_writer(
start=sy.TimeStamp.now(),
channels=["derived_time", "average_example_data_1"],
enable_auto_commit=True
) as writer:
with client.open_streamer(TO_READ) as s:
for fr in s:
time = fr["time_ch_1"][-1]
for k, v in fr.items():
current_values[k] = fr[k]
# If we still don't have data yet from all four channels, skip and wait for
# the next frame.
if len(current_values.items()) < 4:
continue
# Interpolate the values for each channel, and get the timestamps for the average
# channel.
sensor_1, sensor_2, time = interpolate(
current_values["time_ch_1"],
current_values["data_ch_1"],
current_values["time_ch_2"],
current_values["data_ch_2"]
)
# This means we have no samples to write
if len(sensor_1) == 0:
continue
# Calculate the average of the two sensors
avg = (sensor_1 + sensor_2) / 2
writer.write({"derived_time": time, "average_example_data_1": avg, })
65 changes: 65 additions & 0 deletions client/py/examples/multi-rate-calculated/calculated_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2024 Synnax Labs, Inc.
#
# Use of this software is governed by the Business Source License included in the file
# licenses/BSL.txt.
#
# As of the Change Date specified in that file, in accordance with the Business Source
# License, use of this software will be governed by the Apache License, Version 2.0,
# included in the file licenses/APL.txt.

import synnax as sy

"""
This example demonstrates how to calculate the average of two sensor channels that
are being sampled at different rates. This example uses a naive method that simply grabs
and uses the latest value from each channel. This approach is simple, computationally
inexpensive, and works well when both channels are operating at consistent rates.
Good examples of this are a sensor operating at 100Hz and another at 50Hz.
"""

# We've logged in via the CLI, so there's no need to provide credentials here.
# See https://docs.synnaxlabs.com/reference/python-client/get-started for more information.
client = sy.Synnax()

# We create a separate index channel to store the timestamps for the calculated values.
# These will store the same timestamps as the raw time channel, but will be used to
# index the calculated values.
derived_time_ch = client.channels.create(
name="derived_time",
is_index=True,
retrieve_if_name_exists=True
)

# We'll store the average of "stream_write_example_data_1" and "stream_write_example_data_2"
# in this channel.
average_example_data_1 = client.channels.create(
name="average_example_data_1",
index=derived_time_ch.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True
)

TO_READ = ["time_ch_1", "time_ch_2", "data_ch_1", "data_ch_2"]

# Create a dictionary to store the latest values of each channel.
current_values = dict()

with client.open_writer(
start=sy.TimeStamp.now(),
channels=["derived_time", "average_example_data_1"],
enable_auto_commit=True
) as writer:
with client.open_streamer(TO_READ) as s:
for fr in s:
time = fr["time_ch_1"][-1]
# Store the latest values in state.
for k, v in fr.items():
current_values[k] = fr[k][-1]

# If we don't have values for all channels, skip this iteration.
if len(current_values.items()) < 4:
continue

# Caluclate and write the average.
avg = (current_values["data_ch_1"] + current_values["data_ch_2"]) / 2
writer.write({"derived_time": time, "average_example_data_1": avg})
104 changes: 104 additions & 0 deletions client/py/examples/multi-rate-calculated/simulated_daq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2024 Synnax Labs, Inc.
#
# Use of this software is governed by the Business Source License included in the file
# licenses/BSL.txt.
#
# As of the Change Date specified in that file, in accordance with the Business Source
# License, use of this software will be governed by the Apache License, Version 2.0,
# included in the file licenses/APL.txt.

import time
import numpy as np
import synnax as sy

"""
This example sets up a simulated data acquisition system that writes data to two
channels at different rates (along with their indexes). This example should be run
in conjunction with the 'calculated_interpolation.py' example or the
'calculated_simple.py' example to demonstrate how to calculate derived values from
these channels with different rates.
"""

# We've logged in via the CLI, so there's no need to provide credentials here.
# See https://docs.synnaxlabs.com/reference/python-client/get-started for more information.
client = sy.Synnax()

# Create an index channel that will be used to store our timestamps for the first
# channel, operating at rate 1.
time_ch_1 = client.channels.create(
name="time_ch_1",
is_index=True,
data_type=sy.DataType.TIMESTAMP,
retrieve_if_name_exists=True,
)

# Create a second index channel that will be used to store our timestamps for the second
# channel, operating at rate 2.
time_ch_2 = client.channels.create(
name="time_ch_2",
is_index=True,
data_type=sy.DataType.TIMESTAMP,
retrieve_if_name_exists=True,
)

# Data for the first channel, operating at rate 1.
data_ch_1 = client.channels.create(
name="data_ch_1",
index=time_ch_1.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True,
)

# Data for the second channel, operating at rate 2.
data_ch_2 = client.channels.create(
name="data_ch_2",
index=time_ch_2.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True,
)

# We'll start our write at the current time. This timestamp should be the same as or
# just before the first timestamp we write.
start = sy.TimeStamp.now()

# Set a rough data rate of 20 Hz. This won't be exact because we're sleeping for a
# fixed amount of time, but it's close enough for demonstration purposes.
rough_rate = sy.Loop(sy.Rate.HZ * 30)


# Open the writer as a context manager. This will make sure the writer is properly
# closed when we're done writing. We'll write to both the time and data channels. In
# this example, we provide the keys of the channels we want to write to, but you can
# also provide the names and write that way.
start = sy.TimeStamp.now()
with client.open_writer(
start, [time_ch_1.key, time_ch_2.key, data_ch_1.key, data_ch_2.key],
enable_auto_commit=True
) as writer:
i = 0
while rough_rate.wait():
time = sy.TimeStamp.now()
time_2 = time + sy.TimeSpan.MILLISECOND * 3
# Generate data to write to the first channel.
data_to_write = {
time_ch_1.key: [np.int64(time), np.int64(time_2)],
data_ch_1.key: [
np.float32(np.sin(i / 10)),
np.float32(np.sin((i + 1) / 10))
],
}

# Only write to the second channel every third iteration, so its rate is 10Hz,
# instead of 30Hz.
if i % 3 == 0:
# Generate timestamps at a different time to introduce intentional
# misalignment.
time = sy.TimeStamp.now()
time_2 = time + sy.TimeSpan.MILLISECOND
data_to_write[time_ch_2.key] = [np.int64(time), np.int64(time_2)]
data_to_write[data_ch_2.key] = [np.sin(i / 100), np.sin((i + 1) / 100)]

writer.write(data_to_write)
i += 1

print(sy.TimeSpan.since(start))
6 changes: 2 additions & 4 deletions client/py/examples/stream_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@

# Set a rough data rate of 20 Hz. This won't be exact because we're sleeping for a
# fixed amount of time, but it's close enough for demonstration purposes.
rough_rate = sy.Rate.HZ * 50

total = 5000
loop = sy.Loop(sy.Rate.HZ * 50)

# Open the writer as a context manager. This will make sure the writer is properly
# closed when we're done writing. We'll write to both the time and data channels. In
Expand All @@ -64,7 +62,7 @@
start, [time_ch.key, data_ch_1.key, data_ch_2.key], enable_auto_commit=True
) as writer:
i = 0
while i < total:
while loop.wait():
# Write the data to the writer
writer.write(
{
Expand Down

0 comments on commit 37e0fd9

Please sign in to comment.