Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SY-1120 Server-Side Downsampling #836

Open
wants to merge 42 commits into
base: rc
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0e197cb
move services into service directory
Lham42 Sep 19, 2024
7fb2828
fix ide refactor issues
Lham42 Sep 19, 2024
5440554
remove unnecessary aliases
Lham42 Sep 19, 2024
7caacf5
missing refactors
Lham42 Sep 19, 2024
c926228
change workflows
Lham42 Sep 19, 2024
ef3da63
add franer service
Lham42 Sep 19, 2024
17e3da4
forgot service.go
Lham42 Sep 19, 2024
2625186
missed some changes in start.go
Lham42 Sep 19, 2024
d45ba70
add downsampler confluence segment
Lham42 Sep 19, 2024
392dd0b
checkpoint
Lham42 Sep 20, 2024
b1baef3
checkpoint
Lham42 Sep 20, 2024
ec48734
fixed server panic because didnt start flow for dowensampler
Lham42 Sep 20, 2024
1877b7a
make downsample one word
Lham42 Sep 20, 2024
bbc4aa6
checkpoint
Lham42 Sep 20, 2024
d5331f3
checkpoint - still panics tho
Lham42 Sep 21, 2024
ca8a16d
add python tests that pass
Lham42 Sep 22, 2024
20d540b
fixed tests
Lham42 Sep 23, 2024
0878998
[py/cli] add tests for async streamer
Lham42 Sep 23, 2024
e80d70d
checkpoint
Lham42 Sep 23, 2024
2b2a311
remove unncecessary logs
Lham42 Sep 23, 2024
b9e1c15
add downsample factor to ts client
Lham42 Sep 23, 2024
53793e2
have a wip cpp test running
Lham42 Sep 24, 2024
0651887
added cpp tests
Lham42 Sep 24, 2024
3fa6754
remove unneccessary logs
Lham42 Sep 24, 2024
16635a1
remove unnecessary downsample factor from readframe adapter
Lham42 Sep 24, 2024
2e0d7a0
remove unnecessary log
Lham42 Sep 24, 2024
ea8491d
pr touchups
Lham42 Sep 24, 2024
11e7d35
make downsample factor optional in ts
Lham42 Sep 24, 2024
cdc8cae
[docs] added stream optional arguements
Lham42 Sep 24, 2024
5f27200
broke stream page
Lham42 Sep 24, 2024
230c3a5
[docs] undo docs
Lham42 Sep 24, 2024
cffe7f2
unbroke the stream page
Lham42 Sep 24, 2024
186c582
test
Lham42 Sep 25, 2024
fc4e7a8
remember to include import
Lham42 Sep 25, 2024
bd20120
fixed import
Lham42 Sep 25, 2024
d2c08fc
update docs to be accurate
Lham42 Sep 25, 2024
76e07df
a word
Lham42 Sep 25, 2024
5b510b6
pr changes
Lham42 Sep 26, 2024
e674116
Merge branch 'rc' into eislam/sy-1120-downsampling
Lham42 Sep 26, 2024
cc16e55
missed a comma
Lham42 Sep 26, 2024
f0be03e
missed )};
Lham42 Sep 26, 2024
01d043e
change downsampled streamer to streamer
Lham42 Sep 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/deploy.synnax.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ jobs:

- name: Move Driver
if: needs.setup.outputs.changed == 'true'
run: mv bazel-bin/driver/driver_main${{ steps.executable.outputs.EXECUTABLE }} synnax/pkg/hardware/embedded/assets/
run: mv bazel-bin/driver/driver_main${{ steps.executable.outputs.EXECUTABLE }} synnax/pkg/service/hardware/embedded/assets/

- name: Rename Driver
if: needs.setup.outputs.changed == 'true'
run: mv synnax/pkg/hardware/embedded/assets/driver_main${{ steps.executable.outputs.EXECUTABLE }} synnax/pkg/hardware/embedded/assets/driver${{ steps.executable.outputs.EXECUTABLE }}
run: mv synnax/pkg/service/hardware/embedded/assets/driver_main${{ steps.executable.outputs.EXECUTABLE }} synnax/pkg/service/hardware/embedded/assets/driver${{ steps.executable.outputs.EXECUTABLE }}


- name: Set up Go
Expand Down
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ landing/

synnax-data/**

synnax/pkg/hardware/embedded/assets/**
!synnax/pkg/hardware/embedded/assets/.gitkeep
synnax/pkg/service/hardware/embedded/assets/**
!synnax/pkg/service/hardware/embedded/assets/.gitkeep

# |||||| BAZEL ||||||
# Ignore all bazel-* symlinks. There is no full list since this can change
Expand Down
16 changes: 16 additions & 0 deletions client/cpp/framer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,19 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name ="streamer_test",
srcs = [
"streamer_test.cpp"
],
copts = select({
"@platforms//os:windows": ["/std:c++20"],
"//conditions:default": [],
}),
deps = [
"//client/cpp:synnax",
"//client/cpp/testutil",
"@com_google_googletest//:gtest_main",
],
)
1 change: 1 addition & 0 deletions client/cpp/framer/framer.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class StreamerConfig {
public:
/// @brief the channels to stream.
std::vector<ChannelKey> channels;
int downsample_factor = 1;
private:
void toProto(api::v1::FrameStreamerRequest &f) const;

Expand Down
1 change: 1 addition & 0 deletions client/cpp/framer/streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ using namespace synnax;

void StreamerConfig::toProto(api::v1::FrameStreamerRequest &f) const {
f.mutable_keys()->Add(channels.begin(), channels.end());
f.set_downsample_factor(downsample_factor);
}

std::pair<Streamer, freighter::Error> FrameClient::openStreamer(
Expand Down
93 changes: 91 additions & 2 deletions client/cpp/framer/streamer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
#include "client/cpp/synnax.h"
#include "client/cpp/testutil/testutil.h"

void test_downsample(
std::vector<float> raw_data,
std::vector<float> expected,
int32_t downsample_factor
);
/// @brief it should correctly receive a frame of streamed telemetry from the DB.
TEST(FramerTests, testStreamBasic) {
auto client = new_test_client();
Expand All @@ -35,7 +40,6 @@ TEST(FramerTests, testStreamBasic) {
channels,
});


// Sleep for 5 milliseconds to allow for the streamer to bootstrap.
std::this_thread::sleep_for(std::chrono::milliseconds(5));

Expand Down Expand Up @@ -85,7 +89,7 @@ TEST(FramerTests, testStreamSetChannels) {
auto frame = synnax::Frame(1);
frame.add(
data.key,
synnax::Series(std::vector<std::float_t>{1.0}));
synnax::Series(std::vector<std::float_t>{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}));
ASSERT_TRUE(writer.write(std::move(frame)));
auto [res_frame, recErr] = streamer.read();
ASSERT_FALSE(recErr) << recErr.message();
Expand All @@ -98,3 +102,88 @@ TEST(FramerTests, testStreamSetChannels) {
auto wsErr = streamer.close();
ASSERT_FALSE(wsErr) << wsErr.message();
}

/// @brief it should correctly receive a frame of streamed telemetry from the DB.
TEST(FramerTests, TestStreamDownsample) {
std::vector<float> data = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};

test_downsample(data,data,1);

std::vector<float> expected = {1.0, 3.0, 5.0, 7.0, 9.0};
test_downsample(data, expected, 2);

expected = {1.0, 4.0, 7.0, 10.0};
test_downsample(data, expected, 3);

expected = {1.0, 5.0, 9.0};
test_downsample(data, expected, 4);

expected = {1.0, 6.0};
test_downsample(data, expected, 5);

expected = {1.0, 7.0};
test_downsample(data, expected, 6);

expected = {1.0, 8.0};
test_downsample(data, expected, 7);

expected = {1.0, 9.0};
test_downsample(data, expected, 8);

expected = {1.0, 10.0};
test_downsample(data, expected, 9);

expected = {1.0};
test_downsample(data, expected, 10);

test_downsample(data, data,-1);

test_downsample(data, data,0);
}

void test_downsample(
std::vector<float> raw_data,
std::vector<float> expected,
int32_t downsample_factor
) {
auto client = new_test_client();
auto [data, cErr] = client.channels.create(
"data",
synnax::FLOAT32,
1 * synnax::HZ);
ASSERT_FALSE(cErr) << cErr.message();
auto now = synnax::TimeStamp::now();
std::vector<synnax::ChannelKey> channels = {data.key};
auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
channels,
now,
std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
synnax::ControlSubject{"test_writer"}
});
ASSERT_FALSE(wErr) << wErr.message();

auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
channels,
downsample_factor
});

// Sleep for 5 milliseconds to allow for the streamer to bootstrap.
std::this_thread::sleep_for(std::chrono::milliseconds(5));

auto frame = synnax::Frame(1);
frame.add(
data.key,
synnax::Series(raw_data)
);
ASSERT_TRUE(writer.write(std::move(frame)));
auto [res_frame, recErr] = streamer.read();
ASSERT_FALSE(recErr) << recErr.message();

for (int i = 0; i < expected.size(); i++)
ASSERT_EQ(res_frame.series->at(0).values<float>()[i], expected[i]);

auto wcErr = writer.close();
ASSERT_FALSE(cErr) << cErr.message();
auto wsErr = streamer.close();
ASSERT_FALSE(wsErr) << wsErr.message();
}
2 changes: 1 addition & 1 deletion client/py/synnax/channel/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
ChannelKeys = list[int]
ChannelNames = list[str]
ChannelParams = ChannelKeys | ChannelNames | ChannelKey | ChannelName

DownsampleFactor = int

class ChannelPayload(Payload):
"""A payload container that represent the properties of a channel exchanged to and
Expand Down
1 change: 0 additions & 1 deletion client/py/synnax/framer/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from synnax.framer.frame import Frame, CrudeFrame
from synnax.telem.series import CrudeSeries, Series


class ReadFrameAdapter:
__adapter: dict[ChannelKey, ChannelName] | None
retriever: ChannelRetriever
Expand Down
13 changes: 10 additions & 3 deletions client/py/synnax/framer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,24 +258,31 @@ def read(
)
return series

def open_streamer(self, channels: ChannelParams) -> Streamer:
def open_streamer(self, channels: ChannelParams, downsampleFactor: int = 1) -> Streamer:
"""Opens a new streamer on the given channels. The streamer will immediately
being receiving frames of data from the given channels.

:param channels: The channels to stream from. This can be a single channel name,
a list of channel names, a single channel key, or a list of channel keys.

:param downsampleFactor: The downsample factor to use for the streamer.
"""
adapter = ReadFrameAdapter(self.__channels)
adapter.update(channels)
return Streamer(
adapter=adapter,
client=self.__stream_client,
downsample_factor=downsampleFactor,
)

async def open_async_streamer(self, channels: ChannelParams) -> AsyncStreamer:
async def open_async_streamer(self, channels: ChannelParams, downsampleFactor: int = 1) -> AsyncStreamer:
adapter = ReadFrameAdapter(self.__channels)
adapter.update(channels)
s = AsyncStreamer(adapter=adapter, client=self.__async_client)
s = AsyncStreamer(
adapter=adapter,
client=self.__async_client,
downsample_factor=downsampleFactor,
)
await s._open()
return s

Expand Down
24 changes: 17 additions & 7 deletions client/py/synnax/framer/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
StreamClient,
)

from synnax.channel.payload import ChannelKeys, ChannelParams
from synnax.channel.payload import ChannelKeys, ChannelParams,DownsampleFactor
from synnax.exceptions import UnexpectedError
from synnax.framer.adapter import ReadFrameAdapter
from synnax.framer.frame import Frame, FramePayload
Expand All @@ -27,7 +27,7 @@

class _Request(Payload):
keys: ChannelKeys

downsample_factor: DownsampleFactor

class _Response(Payload):
frame: FramePayload
Expand Down Expand Up @@ -63,18 +63,19 @@ def __init__(
self,
client: StreamClient,
adapter: ReadFrameAdapter,
downsample_factor: DownsampleFactor,
) -> None:
self._stream = client.stream(_ENDPOINT, _Request, _Response)
self._adapter = adapter
self._downsample_factor = downsample_factor
self.__open()

def __open(self):
self._stream.send(_Request(keys=self._adapter.keys))
self._stream.send(_Request(keys=self._adapter.keys, downsample_factor=self._downsample_factor))
_, exc = self._stream.receive()
if exc is not None:
raise exc


@overload
def read(self, timeout: float | int | TimeSpan) -> Frame | None:
"""Reads the next frame of telemetry from the streamer with a timeout. If no
Expand Down Expand Up @@ -127,7 +128,7 @@ def update_channels(self, channels: ChannelParams):
:raises NotFoundError: If any of the channels in the list are not found.
"""
self._adapter.update(channels)
self._stream.send(_Request(keys=self._adapter.keys))
self._stream.send(_Request(keys=self._adapter.keys, downsample_factor=self._downsample_factor))

def close(self, timeout: float | int | TimeSpan | None = None):
"""Closes the streamer and frees all network resources.
Expand Down Expand Up @@ -198,13 +199,22 @@ def __init__(
self,
client: AsyncStreamClient,
adapter: ReadFrameAdapter,
downsample_factor: DownsampleFactor,
) -> None:
self._client = client
self._adapter = adapter
self._downsample_factor = downsample_factor

async def _open(self):
self._stream = await self._client.stream(_ENDPOINT, _Request, _Response)
await self._stream.send(_Request(keys=self._adapter.keys))
self._stream = await self._client.stream(
_ENDPOINT,
_Request,
_Response
)
await self._stream.send(_Request(
keys=self._adapter.keys,
downsample_factor=self._downsample_factor,
))
_, exc = await self._stream.receive()
if exc is not None:
raise exc
Expand Down
71 changes: 70 additions & 1 deletion client/py/tests/test_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,18 +437,85 @@ def test_timeout_timespan(self, channel: sy.Channel, client: sy.Synnax):
assert f is None
assert abs(TimeSpan.since(start).seconds - 0.1) < 0.05

def test_downsample(self, channel: sy.Channel, client: sy.Synnax):
"""Should correctly stream data for a channel"""
with client.open_streamer(channel.key, 1) as s:
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
data = np.random.rand(10).astype(np.float64)
w.write(pd.DataFrame({channel.key: data}))
frame = s.read(timeout=1)
assert all(frame[channel.key] == data)
with client.open_streamer(channel.key, 2) as s:
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
data = [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0]
expect = [1.0,3.0,5.0,7.0,9.0]
w.write(pd.DataFrame({channel.key: data}))
frame = s.read(timeout=1)
assert all(frame[channel.key] == expect)
with client.open_streamer(channel.key, 10) as s:
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
data = [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0]
expect = [1.0]
w.write(pd.DataFrame({channel.key: data}))
frame = s.read(timeout=1)
assert all(frame[channel.key] == expect)
with client.open_streamer(channel.key, 20) as s:
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
data = [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0]
expect = [1.0]
w.write(pd.DataFrame({channel.key: data}))
frame = s.read(timeout=1)
assert all(frame[channel.key] == expect)
with client.open_streamer(channel.key, -1) as s:
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
w.write(pd.DataFrame({channel.key: data}))
frame = s.read(timeout=1)
assert all(frame[channel.key] == data)


@pytest.mark.framer
class TestAsyncStreamer:
@pytest.mark.asyncio
async def test_basic_stream(self, channel: sy.Channel, client: sy.Synnax):
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
async with await client.open_async_streamer(channel.key) as s:
async with await client.open_async_streamer(channel.key,1) as s:
time.sleep(0.1)
data = np.random.rand(10).astype(np.float64)
w.write(pd.DataFrame({channel.key: data}))
frame = await s.read()
assert all(frame[channel.key] == data)
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
async with await client.open_async_streamer(channel.key, 2) as s:
time.sleep(0.1)
data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
expect = [1.0, 3.0, 5.0, 7.0, 9.0]
w.write(pd.DataFrame({channel.key: data}))
frame = await s.read()
assert all(frame[channel.key] == expect)
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
async with await client.open_async_streamer(channel.key, 10) as s:
time.sleep(0.1)
data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
expect = [1.0]
w.write(pd.DataFrame({channel.key: data}))
frame = await s.read()
assert all(frame[channel.key] == expect)
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
async with await client.open_async_streamer(channel.key, 20) as s:
time.sleep(0.1)
data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
expect = [1.0]
w.write(pd.DataFrame({channel.key: data}))
frame = await s.read()
assert all(frame[channel.key] == expect)
with client.open_writer(sy.TimeStamp.now(), channel.key) as w:
async with await client.open_async_streamer(channel.key, -1) as s:
time.sleep(0.1)
data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
w.write(pd.DataFrame({channel.key: data}))
frame = await s.read()
assert all(frame[channel.key] == data)


@pytest.mark.framer
Expand Down Expand Up @@ -546,3 +613,5 @@ def test_delete_index_alone(self, client: sy.Synnax):
TimeStamp(1 * TimeSpan.SECOND).range(TimeStamp(2 * TimeSpan.SECOND))
),
)


Loading
Loading