diff --git a/client/py/examples/multi-rate-calculated/calculated_interpolation.py b/client/py/examples/multi-rate-calculated/calculated_interpolation.py new file mode 100644 index 0000000000..27036b2c21 --- /dev/null +++ b/client/py/examples/multi-rate-calculated/calculated_interpolation.py @@ -0,0 +1,112 @@ +# Copyright 2024 Synnax Labs, Inc. +# +# Use of this software is governed by the Business Source License included in the file +# licenses/BSL.txt. +# +# As of the Change Date specified in that file, in accordance with the Business Source +# License, use of this software will be governed by the Apache License, Version 2.0, +# included in the file licenses/APL.txt. + +import synnax as sy + +""" +This example demonstrates how to calculate the average of two sensor channels that +are being sampled at different rates, using numpy's interpolation function to correctly +align the timestamps of the two channels. This example is more complex than the +'calculate_simple.py' example, and requires more computational resources to run. + +This example must be run in conjunction with the 'simulated_daq.py' file in this +directory. Run the 'simulated_daq.py' file first, and then run this file. +""" + +# We've logged in via the CLI, so there's no need to provide credentials here. +# See https://docs.synnaxlabs.com/reference/python-client/get-started for more information. +client = sy.Synnax() + +# We create a separate index channel to store the timestamps for the calculated values. +# These will store the same timestamps as the raw time channel, but will be used to +# index the calculated values. +derived_time_ch = client.channels.create( + name="derived_time", + is_index=True, + retrieve_if_name_exists=True +) + +# We'll store the average of "stream_write_example_data_1" and "stream_write_example_data_2" +# in this channel. +average_example_data_1 = client.channels.create( + name="average_example_data_1", + index=derived_time_ch.key, + data_type=sy.DataType.FLOAT32, + retrieve_if_name_exists=True +) + +current_values = dict() + +TO_READ = ["time_ch_1", "time_ch_2", "data_ch_1", "data_ch_2"] + +import numpy as np + + +def interpolate(data_ch_1_time, data_ch_1, data_ch_2_time, data_ch_2): + # Start off by converting the data to numpy arrays + data_ch_1_time = np.array(data_ch_1_time) + data_ch_1 = np.array(data_ch_1) + data_ch_2_time = np.array(data_ch_2_time) + data_ch_2 = np.array(data_ch_2) + + # Check whether any of the timestamps overlap. If not, we can't interpolate. + start_time = max(data_ch_1_time[0], data_ch_2_time[0]) + end_time = min(data_ch_1_time[-1], data_ch_2_time[-1]) + if start_time > end_time: + return np.array([]), np.array([]), np.array([]) + + combined_timestamps = np.unique( + np.concatenate((data_ch_1_time, data_ch_2_time))) + # We only want to interpolate values that are within the range of both channels. + avg_timestamps = combined_timestamps[ + (combined_timestamps >= start_time) & (combined_timestamps <= end_time) + ] + # Interpolate the values for each channel + sensor1_values_interp = np.interp( + avg_timestamps, + data_ch_1_time, + data_ch_1 + ) + sensor2_values_interp = np.interp( + avg_timestamps, + data_ch_2_time, + data_ch_2 + ) + # Return the interpolated values and the timestamps + return sensor1_values_interp, sensor2_values_interp, avg_timestamps + + +with client.open_writer( + start=sy.TimeStamp.now(), + channels=["derived_time", "average_example_data_1"], + enable_auto_commit=True +) as writer: + with client.open_streamer(TO_READ) as s: + for fr in s: + time = fr["time_ch_1"][-1] + for k, v in fr.items(): + current_values[k] = fr[k] + # If we still don't have data yet from all four channels, skip and wait for + # the next frame. + if len(current_values.items()) < 4: + continue + # Interpolate the values for each channel, and get the timestamps for the average + # channel. + sensor_1, sensor_2, time = interpolate( + current_values["time_ch_1"], + current_values["data_ch_1"], + current_values["time_ch_2"], + current_values["data_ch_2"] + ) + # This means we have no samples to write + if len(sensor_1) == 0: + continue + # Calculate the average of the two sensors + avg = (sensor_1 + sensor_2) / 2 + writer.write({"derived_time": time, "average_example_data_1": avg, }) diff --git a/client/py/examples/multi-rate-calculated/calculated_simple.py b/client/py/examples/multi-rate-calculated/calculated_simple.py new file mode 100644 index 0000000000..e736a8ea90 --- /dev/null +++ b/client/py/examples/multi-rate-calculated/calculated_simple.py @@ -0,0 +1,65 @@ +# Copyright 2024 Synnax Labs, Inc. +# +# Use of this software is governed by the Business Source License included in the file +# licenses/BSL.txt. +# +# As of the Change Date specified in that file, in accordance with the Business Source +# License, use of this software will be governed by the Apache License, Version 2.0, +# included in the file licenses/APL.txt. + +import synnax as sy + +""" +This example demonstrates how to calculate the average of two sensor channels that +are being sampled at different rates. This example uses a naive method that simply grabs +and uses the latest value from each channel. This approach is simple, computationally +inexpensive, and works well when both channels are operating at consistent rates. +Good examples of this are a sensor operating at 100Hz and another at 50Hz. +""" + +# We've logged in via the CLI, so there's no need to provide credentials here. +# See https://docs.synnaxlabs.com/reference/python-client/get-started for more information. +client = sy.Synnax() + +# We create a separate index channel to store the timestamps for the calculated values. +# These will store the same timestamps as the raw time channel, but will be used to +# index the calculated values. +derived_time_ch = client.channels.create( + name="derived_time", + is_index=True, + retrieve_if_name_exists=True +) + +# We'll store the average of "stream_write_example_data_1" and "stream_write_example_data_2" +# in this channel. +average_example_data_1 = client.channels.create( + name="average_example_data_1", + index=derived_time_ch.key, + data_type=sy.DataType.FLOAT32, + retrieve_if_name_exists=True +) + +TO_READ = ["time_ch_1", "time_ch_2", "data_ch_1", "data_ch_2"] + +# Create a dictionary to store the latest values of each channel. +current_values = dict() + +with client.open_writer( + start=sy.TimeStamp.now(), + channels=["derived_time", "average_example_data_1"], + enable_auto_commit=True +) as writer: + with client.open_streamer(TO_READ) as s: + for fr in s: + time = fr["time_ch_1"][-1] + # Store the latest values in state. + for k, v in fr.items(): + current_values[k] = fr[k][-1] + + # If we don't have values for all channels, skip this iteration. + if len(current_values.items()) < 4: + continue + + # Caluclate and write the average. + avg = (current_values["data_ch_1"] + current_values["data_ch_2"]) / 2 + writer.write({"derived_time": time, "average_example_data_1": avg}) diff --git a/client/py/examples/multi-rate-calculated/simulated_daq.py b/client/py/examples/multi-rate-calculated/simulated_daq.py new file mode 100644 index 0000000000..234e84e4a9 --- /dev/null +++ b/client/py/examples/multi-rate-calculated/simulated_daq.py @@ -0,0 +1,104 @@ +# Copyright 2024 Synnax Labs, Inc. +# +# Use of this software is governed by the Business Source License included in the file +# licenses/BSL.txt. +# +# As of the Change Date specified in that file, in accordance with the Business Source +# License, use of this software will be governed by the Apache License, Version 2.0, +# included in the file licenses/APL.txt. + +import time +import numpy as np +import synnax as sy + +""" +This example sets up a simulated data acquisition system that writes data to two +channels at different rates (along with their indexes). This example should be run +in conjunction with the 'calculated_interpolation.py' example or the +'calculated_simple.py' example to demonstrate how to calculate derived values from +these channels with different rates. +""" + +# We've logged in via the CLI, so there's no need to provide credentials here. +# See https://docs.synnaxlabs.com/reference/python-client/get-started for more information. +client = sy.Synnax() + +# Create an index channel that will be used to store our timestamps for the first +# channel, operating at rate 1. +time_ch_1 = client.channels.create( + name="time_ch_1", + is_index=True, + data_type=sy.DataType.TIMESTAMP, + retrieve_if_name_exists=True, +) + +# Create a second index channel that will be used to store our timestamps for the second +# channel, operating at rate 2. +time_ch_2 = client.channels.create( + name="time_ch_2", + is_index=True, + data_type=sy.DataType.TIMESTAMP, + retrieve_if_name_exists=True, +) + +# Data for the first channel, operating at rate 1. +data_ch_1 = client.channels.create( + name="data_ch_1", + index=time_ch_1.key, + data_type=sy.DataType.FLOAT32, + retrieve_if_name_exists=True, +) + +# Data for the second channel, operating at rate 2. +data_ch_2 = client.channels.create( + name="data_ch_2", + index=time_ch_2.key, + data_type=sy.DataType.FLOAT32, + retrieve_if_name_exists=True, +) + +# We'll start our write at the current time. This timestamp should be the same as or +# just before the first timestamp we write. +start = sy.TimeStamp.now() + +# Set a rough data rate of 20 Hz. This won't be exact because we're sleeping for a +# fixed amount of time, but it's close enough for demonstration purposes. +rough_rate = sy.Loop(sy.Rate.HZ * 30) + + +# Open the writer as a context manager. This will make sure the writer is properly +# closed when we're done writing. We'll write to both the time and data channels. In +# this example, we provide the keys of the channels we want to write to, but you can +# also provide the names and write that way. +start = sy.TimeStamp.now() +with client.open_writer( + start, [time_ch_1.key, time_ch_2.key, data_ch_1.key, data_ch_2.key], + enable_auto_commit=True +) as writer: + i = 0 + while rough_rate.wait(): + time = sy.TimeStamp.now() + time_2 = time + sy.TimeSpan.MILLISECOND * 3 + # Generate data to write to the first channel. + data_to_write = { + time_ch_1.key: [np.int64(time), np.int64(time_2)], + data_ch_1.key: [ + np.float32(np.sin(i / 10)), + np.float32(np.sin((i + 1) / 10)) + ], + } + + # Only write to the second channel every third iteration, so its rate is 10Hz, + # instead of 30Hz. + if i % 3 == 0: + # Generate timestamps at a different time to introduce intentional + # misalignment. + time = sy.TimeStamp.now() + time_2 = time + sy.TimeSpan.MILLISECOND + data_to_write[time_ch_2.key] = [np.int64(time), np.int64(time_2)] + data_to_write[data_ch_2.key] = [np.sin(i / 100), np.sin((i + 1) / 100)] + + writer.write(data_to_write) + i += 1 + +print(sy.TimeSpan.since(start)) diff --git a/client/py/examples/stream_write.py b/client/py/examples/stream_write.py index c78a6941c5..41cd621c64 100644 --- a/client/py/examples/stream_write.py +++ b/client/py/examples/stream_write.py @@ -51,9 +51,7 @@ # Set a rough data rate of 20 Hz. This won't be exact because we're sleeping for a # fixed amount of time, but it's close enough for demonstration purposes. -rough_rate = sy.Rate.HZ * 50 - -total = 5000 +loop = sy.Loop(sy.Rate.HZ * 50) # Open the writer as a context manager. This will make sure the writer is properly # closed when we're done writing. We'll write to both the time and data channels. In @@ -64,7 +62,7 @@ start, [time_ch.key, data_ch_1.key, data_ch_2.key], enable_auto_commit=True ) as writer: i = 0 - while i < total: + while loop.wait(): # Write the data to the writer writer.write( {