Skip to content

Commit

Permalink
Merge branch 'rc' into eislam/sy-1120-downsampling
Browse files Browse the repository at this point in the history
  • Loading branch information
Lham42 authored Sep 26, 2024
2 parents 5b510b6 + 034cf41 commit e674116
Show file tree
Hide file tree
Showing 128 changed files with 2,568 additions and 682 deletions.
2 changes: 1 addition & 1 deletion alamos/py/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "alamos"
version = "0.29.0"
version = "0.30.0"
description = ""
authors = ["Emiliano Bonilla <ebonilla@synnaxlabs.com>"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion alamos/ts/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@synnaxlabs/alamos",
"version": "0.29.0",
"version": "0.30.0",
"type": "module",
"description": "Distributed instrumentation for Synnax",
"repository": "https://github.com/synnaxlabs/synnax/tree/main/freighter/ts",
Expand Down
4 changes: 3 additions & 1 deletion client/cpp/framer/streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ std::pair<Streamer, freighter::Error> FrameClient::openStreamer(
auto req = api::v1::FrameStreamerRequest();
config.toProto(req);
auto exc2 = s->send(req);
return {Streamer(std::move(s)), exc2};
if (exc2) return {Streamer(std::move(s)), exc2};
auto [_, resExc] = s->receive();
return {Streamer(std::move(s)), resExc};
}

Streamer::Streamer(std::unique_ptr<StreamerStream> s) : stream(std::move(s)) {
Expand Down
112 changes: 112 additions & 0 deletions client/py/examples/multi-rate-calculated/calculated_interpolation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright 2024 Synnax Labs, Inc.
#
# Use of this software is governed by the Business Source License included in the file
# licenses/BSL.txt.
#
# As of the Change Date specified in that file, in accordance with the Business Source
# License, use of this software will be governed by the Apache License, Version 2.0,
# included in the file licenses/APL.txt.

import synnax as sy

"""
This example demonstrates how to calculate the average of two sensor channels that
are being sampled at different rates, using numpy's interpolation function to correctly
align the timestamps of the two channels. This example is more complex than the
'calculate_simple.py' example, and requires more computational resources to run.
This example must be run in conjunction with the 'simulated_daq.py' file in this
directory. Run the 'simulated_daq.py' file first, and then run this file.
"""

# We've logged in via the CLI, so there's no need to provide credentials here.
# See https://docs.synnaxlabs.com/reference/python-client/get-started for more information.
client = sy.Synnax()

# We create a separate index channel to store the timestamps for the calculated values.
# These will store the same timestamps as the raw time channel, but will be used to
# index the calculated values.
derived_time_ch = client.channels.create(
name="derived_time",
is_index=True,
retrieve_if_name_exists=True
)

# We'll store the average of "stream_write_example_data_1" and "stream_write_example_data_2"
# in this channel.
average_example_data_1 = client.channels.create(
name="average_example_data_1",
index=derived_time_ch.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True
)

current_values = dict()

TO_READ = ["time_ch_1", "time_ch_2", "data_ch_1", "data_ch_2"]

import numpy as np


def interpolate(data_ch_1_time, data_ch_1, data_ch_2_time, data_ch_2):
# Start off by converting the data to numpy arrays
data_ch_1_time = np.array(data_ch_1_time)
data_ch_1 = np.array(data_ch_1)
data_ch_2_time = np.array(data_ch_2_time)
data_ch_2 = np.array(data_ch_2)

# Check whether any of the timestamps overlap. If not, we can't interpolate.
start_time = max(data_ch_1_time[0], data_ch_2_time[0])
end_time = min(data_ch_1_time[-1], data_ch_2_time[-1])
if start_time > end_time:
return np.array([]), np.array([]), np.array([])

combined_timestamps = np.unique(
np.concatenate((data_ch_1_time, data_ch_2_time)))
# We only want to interpolate values that are within the range of both channels.
avg_timestamps = combined_timestamps[
(combined_timestamps >= start_time) & (combined_timestamps <= end_time)
]
# Interpolate the values for each channel
sensor1_values_interp = np.interp(
avg_timestamps,
data_ch_1_time,
data_ch_1
)
sensor2_values_interp = np.interp(
avg_timestamps,
data_ch_2_time,
data_ch_2
)
# Return the interpolated values and the timestamps
return sensor1_values_interp, sensor2_values_interp, avg_timestamps


with client.open_writer(
start=sy.TimeStamp.now(),
channels=["derived_time", "average_example_data_1"],
enable_auto_commit=True
) as writer:
with client.open_streamer(TO_READ) as s:
for fr in s:
time = fr["time_ch_1"][-1]
for k, v in fr.items():
current_values[k] = fr[k]
# If we still don't have data yet from all four channels, skip and wait for
# the next frame.
if len(current_values.items()) < 4:
continue
# Interpolate the values for each channel, and get the timestamps for the average
# channel.
sensor_1, sensor_2, time = interpolate(
current_values["time_ch_1"],
current_values["data_ch_1"],
current_values["time_ch_2"],
current_values["data_ch_2"]
)
# This means we have no samples to write
if len(sensor_1) == 0:
continue
# Calculate the average of the two sensors
avg = (sensor_1 + sensor_2) / 2
writer.write({"derived_time": time, "average_example_data_1": avg, })
65 changes: 65 additions & 0 deletions client/py/examples/multi-rate-calculated/calculated_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2024 Synnax Labs, Inc.
#
# Use of this software is governed by the Business Source License included in the file
# licenses/BSL.txt.
#
# As of the Change Date specified in that file, in accordance with the Business Source
# License, use of this software will be governed by the Apache License, Version 2.0,
# included in the file licenses/APL.txt.

import synnax as sy

"""
This example demonstrates how to calculate the average of two sensor channels that
are being sampled at different rates. This example uses a naive method that simply grabs
and uses the latest value from each channel. This approach is simple, computationally
inexpensive, and works well when both channels are operating at consistent rates.
Good examples of this are a sensor operating at 100Hz and another at 50Hz.
"""

# We've logged in via the CLI, so there's no need to provide credentials here.
# See https://docs.synnaxlabs.com/reference/python-client/get-started for more information.
client = sy.Synnax()

# We create a separate index channel to store the timestamps for the calculated values.
# These will store the same timestamps as the raw time channel, but will be used to
# index the calculated values.
derived_time_ch = client.channels.create(
name="derived_time",
is_index=True,
retrieve_if_name_exists=True
)

# We'll store the average of "stream_write_example_data_1" and "stream_write_example_data_2"
# in this channel.
average_example_data_1 = client.channels.create(
name="average_example_data_1",
index=derived_time_ch.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True
)

TO_READ = ["time_ch_1", "time_ch_2", "data_ch_1", "data_ch_2"]

# Create a dictionary to store the latest values of each channel.
current_values = dict()

with client.open_writer(
start=sy.TimeStamp.now(),
channels=["derived_time", "average_example_data_1"],
enable_auto_commit=True
) as writer:
with client.open_streamer(TO_READ) as s:
for fr in s:
time = fr["time_ch_1"][-1]
# Store the latest values in state.
for k, v in fr.items():
current_values[k] = fr[k][-1]

# If we don't have values for all channels, skip this iteration.
if len(current_values.items()) < 4:
continue

# Caluclate and write the average.
avg = (current_values["data_ch_1"] + current_values["data_ch_2"]) / 2
writer.write({"derived_time": time, "average_example_data_1": avg})
104 changes: 104 additions & 0 deletions client/py/examples/multi-rate-calculated/simulated_daq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2024 Synnax Labs, Inc.
#
# Use of this software is governed by the Business Source License included in the file
# licenses/BSL.txt.
#
# As of the Change Date specified in that file, in accordance with the Business Source
# License, use of this software will be governed by the Apache License, Version 2.0,
# included in the file licenses/APL.txt.

import time
import numpy as np
import synnax as sy

"""
This example sets up a simulated data acquisition system that writes data to two
channels at different rates (along with their indexes). This example should be run
in conjunction with the 'calculated_interpolation.py' example or the
'calculated_simple.py' example to demonstrate how to calculate derived values from
these channels with different rates.
"""

# We've logged in via the CLI, so there's no need to provide credentials here.
# See https://docs.synnaxlabs.com/reference/python-client/get-started for more information.
client = sy.Synnax()

# Create an index channel that will be used to store our timestamps for the first
# channel, operating at rate 1.
time_ch_1 = client.channels.create(
name="time_ch_1",
is_index=True,
data_type=sy.DataType.TIMESTAMP,
retrieve_if_name_exists=True,
)

# Create a second index channel that will be used to store our timestamps for the second
# channel, operating at rate 2.
time_ch_2 = client.channels.create(
name="time_ch_2",
is_index=True,
data_type=sy.DataType.TIMESTAMP,
retrieve_if_name_exists=True,
)

# Data for the first channel, operating at rate 1.
data_ch_1 = client.channels.create(
name="data_ch_1",
index=time_ch_1.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True,
)

# Data for the second channel, operating at rate 2.
data_ch_2 = client.channels.create(
name="data_ch_2",
index=time_ch_2.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True,
)

# We'll start our write at the current time. This timestamp should be the same as or
# just before the first timestamp we write.
start = sy.TimeStamp.now()

# Set a rough data rate of 20 Hz. This won't be exact because we're sleeping for a
# fixed amount of time, but it's close enough for demonstration purposes.
rough_rate = sy.Loop(sy.Rate.HZ * 30)


# Open the writer as a context manager. This will make sure the writer is properly
# closed when we're done writing. We'll write to both the time and data channels. In
# this example, we provide the keys of the channels we want to write to, but you can
# also provide the names and write that way.
start = sy.TimeStamp.now()
with client.open_writer(
start, [time_ch_1.key, time_ch_2.key, data_ch_1.key, data_ch_2.key],
enable_auto_commit=True
) as writer:
i = 0
while rough_rate.wait():
time = sy.TimeStamp.now()
time_2 = time + sy.TimeSpan.MILLISECOND * 3
# Generate data to write to the first channel.
data_to_write = {
time_ch_1.key: [np.int64(time), np.int64(time_2)],
data_ch_1.key: [
np.float32(np.sin(i / 10)),
np.float32(np.sin((i + 1) / 10))
],
}

# Only write to the second channel every third iteration, so its rate is 10Hz,
# instead of 30Hz.
if i % 3 == 0:
# Generate timestamps at a different time to introduce intentional
# misalignment.
time = sy.TimeStamp.now()
time_2 = time + sy.TimeSpan.MILLISECOND
data_to_write[time_ch_2.key] = [np.int64(time), np.int64(time_2)]
data_to_write[data_ch_2.key] = [np.sin(i / 100), np.sin((i + 1) / 100)]

writer.write(data_to_write)
i += 1

print(sy.TimeSpan.since(start))
6 changes: 2 additions & 4 deletions client/py/examples/stream_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@

# Set a rough data rate of 20 Hz. This won't be exact because we're sleeping for a
# fixed amount of time, but it's close enough for demonstration purposes.
rough_rate = sy.Rate.HZ * 50

total = 5000
loop = sy.Loop(sy.Rate.HZ * 50)

# Open the writer as a context manager. This will make sure the writer is properly
# closed when we're done writing. We'll write to both the time and data channels. In
Expand All @@ -64,7 +62,7 @@
start, [time_ch.key, data_ch_1.key, data_ch_2.key], enable_auto_commit=True
) as writer:
i = 0
while i < total:
while loop.wait():
# Write the data to the writer
writer.write(
{
Expand Down
2 changes: 1 addition & 1 deletion client/py/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "synnax"
version = "0.29.0"
version = "0.30.0"
description = "Synnax Client Library"
keywords = ["Synnax", "Synnax Python Client"]
authors = ["emiliano bonilla <emilbon99@gmail.com>"]
Expand Down
21 changes: 16 additions & 5 deletions client/py/synnax/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from __future__ import annotations

from collections.abc import Callable
from threading import Event
from threading import Event, Lock
from typing import Any, Protocol, overload
from asyncio import create_task, Future

Expand Down Expand Up @@ -281,13 +281,13 @@ def _internal_wait_until(
raise ValueError("First argument to wait_until must be a callable.")
processor = WaitUntil(cond, reverse)
try:
self._receiver.processors.add(processor)
self._receiver.add_processor(processor)
timeout_seconds = (
TimeSpan.from_seconds(timeout).seconds if timeout else None
)
ok = processor.event.wait(timeout=timeout_seconds)
finally:
self._receiver.processors.remove(processor)
self._receiver.remove_processor(processor)
if processor.exc:
raise processor.exc
return ok
Expand Down Expand Up @@ -458,6 +458,7 @@ class _Receiver(AsyncThread):
client: framer.Client
streamer: framer.AsyncStreamer
processors: set[Processor]
processor_lock: Lock
retriever: ChannelRetriever
controller: Controller
startup_ack: Event
Expand All @@ -475,12 +476,22 @@ def __init__(
self.client = client
self.state = dict()
self.controller = controller
self.processor_lock = Lock()
self.startup_ack = Event()
self.processors = set()

def add_processor(self, processor: Processor):
with self.processor_lock:
self.processors.add(processor)

def remove_processor(self, processor: Processor):
with self.processor_lock:
self.processors.remove(processor)

def _process(self):
for p in self.processors:
p.process(self.controller)
with self.processor_lock:
for p in self.processors:
p.process(self.controller)

async def _listen_for_close(self):
await self.shutdown_future
Expand Down
Loading

0 comments on commit e674116

Please sign in to comment.