Skip to content

Commit

Permalink
added cpp tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Lham42 committed Sep 24, 2024
1 parent 53793e2 commit 0651887
Showing 1 changed file with 173 additions and 77 deletions.
250 changes: 173 additions & 77 deletions client/cpp/framer/streamer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,93 +13,187 @@
#include "client/cpp/synnax.h"
#include "client/cpp/testutil/testutil.h"

void test_downsample(
std::vector<float> raw_data,
std::vector<float> expected,
int32_t downsample_factor
);
// /// @brief it should correctly receive a frame of streamed telemetry from the DB.
// TEST(FramerTests, testStreamBasic) {
// auto client = new_test_client();
// auto [data, cErr] = client.channels.create(
// "data",
// synnax::FLOAT32,
// 1 * synnax::HZ);
// ASSERT_FALSE(cErr) << cErr.message();
// auto now = synnax::TimeStamp::now();
// std::vector<synnax::ChannelKey> channels = {data.key};
// auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
// channels,
// now,
// std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
// synnax::ControlSubject{"test_writer"}
// });
// ASSERT_FALSE(wErr) << wErr.message();
//
// auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
// channels,
// });
//
// // Sleep for 5 milliseconds to allow for the streamer to bootstrap.
// std::this_thread::sleep_for(std::chrono::milliseconds(5));
//
// auto frame = synnax::Frame(1);
// frame.add(
// data.key,
// synnax::Series(std::vector<std::float_t>{1.0}));
// ASSERT_TRUE(writer.write(std::move(frame)));
// auto [res_frame, recErr] = streamer.read();
// ASSERT_FALSE(recErr) << recErr.message();
//
// ASSERT_EQ(res_frame.size(), 1);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);
//
// auto wcErr = writer.close();
// ASSERT_FALSE(cErr) << cErr.message();
// auto wsErr = streamer.close();
// ASSERT_FALSE(wsErr) << wsErr.message();
// }
//
// ///@brief test streamer set channels after construction.
// TEST(FramerTests, testStreamSetChannels) {
// auto client = new_test_client();
// auto [data, cErr] = client.channels.create(
// "data",
// synnax::FLOAT32,
// 1 * synnax::HZ);
// ASSERT_FALSE(cErr) << cErr.message();
// auto now = synnax::TimeStamp::now();
// auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
// {data.key},
// now,
// std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
// synnax::ControlSubject{"test_writer"}
// });
// ASSERT_FALSE(wErr) << wErr.message();
//
// auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
// {},
// });
//
// auto setErr = streamer.setChannels({data.key});
// // Sleep for 5 milliseconds to allow for the streamer to process the updated keys.
// std::this_thread::sleep_for(std::chrono::milliseconds(5));
// ASSERT_FALSE(setErr) << setErr.message();
//
// auto frame = synnax::Frame(1);
// frame.add(
// data.key,
// synnax::Series(std::vector<std::float_t>{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}));
// ASSERT_TRUE(writer.write(std::move(frame)));
// auto [res_frame, recErr] = streamer.read();
// ASSERT_FALSE(recErr) << recErr.message();
//
// ASSERT_EQ(res_frame.size(), 1);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);
//
// auto wcErr = writer.close();
// ASSERT_FALSE(cErr) << cErr.message();
// auto wsErr = streamer.close();
// ASSERT_FALSE(wsErr) << wsErr.message();
// }
//
// /// @brief it should correctly receive a frame of streamed telemetry from the DB.
// TEST(FramerTests, TestStreamDownsample) {
// auto client = new_test_client();
// auto [data, cErr] = client.channels.create(
// "data",
// synnax::FLOAT32,
// 1 * synnax::HZ);
// ASSERT_FALSE(cErr) << cErr.message();
// auto now = synnax::TimeStamp::now();
// std::vector<synnax::ChannelKey> channels = {data.key};
// auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
// channels,
// now,
// std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
// synnax::ControlSubject{"test_writer"}
// });
// ASSERT_FALSE(wErr) << wErr.message();
//
// auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
// channels,
// });
//
// // Sleep for 5 milliseconds to allow for the streamer to bootstrap.
// std::this_thread::sleep_for(std::chrono::milliseconds(5));
//
// auto frame = synnax::Frame(1);
// frame.add(
// data.key,
// synnax::Series(std::vector<std::float_t>{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}));
// ASSERT_TRUE(writer.write(std::move(frame)));
// auto [res_frame, recErr] = streamer.read();
// ASSERT_FALSE(recErr) << recErr.message();
//
// // print series
// for (auto &series : *res_frame.series) {
// std::cout << "Series size: " << series.values<float>().size() << "\n";
// std::cout << "Series: " << series;
// }
//
// ASSERT_EQ(res_frame.series->at(0).values<float>().size(), 5);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[1], 3.0);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[2], 5.0);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[3], 7.0);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[4], 9.0);
//
// auto wcErr = writer.close();
// ASSERT_FALSE(cErr) << cErr.message();
// auto wsErr = streamer.close();
// ASSERT_FALSE(wsErr) << wsErr.message();
// }
/// @brief it should correctly receive a frame of streamed telemetry from the DB.
TEST(FramerTests, testStreamBasic) {
auto client = new_test_client();
auto [data, cErr] = client.channels.create(
"data",
synnax::FLOAT32,
1 * synnax::HZ);
ASSERT_FALSE(cErr) << cErr.message();
auto now = synnax::TimeStamp::now();
std::vector<synnax::ChannelKey> channels = {data.key};
auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
channels,
now,
std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
synnax::ControlSubject{"test_writer"}
});
ASSERT_FALSE(wErr) << wErr.message();
TEST(FramerTests, TestStreamDownsample) {
std::vector<float> data = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};
std::vector<float> expected = {1.0, 3.0, 5.0, 7.0, 9.0};
test_downsample(data, expected, 2);

auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
channels,
});
expected = {1.0, 4.0, 7.0, 10.0};
test_downsample(data, expected, 3);

// Sleep for 5 milliseconds to allow for the streamer to bootstrap.
std::this_thread::sleep_for(std::chrono::milliseconds(5));
expected = {1.0, 5.0, 9.0};
test_downsample(data, expected, 4);

auto frame = synnax::Frame(1);
frame.add(
data.key,
synnax::Series(std::vector<std::float_t>{1.0}));
ASSERT_TRUE(writer.write(std::move(frame)));
auto [res_frame, recErr] = streamer.read();
ASSERT_FALSE(recErr) << recErr.message();
expected = {1.0, 6.0};
test_downsample(data, expected, 5);

ASSERT_EQ(res_frame.size(), 1);
ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);
expected = {1.0, 7.0};
test_downsample(data, expected, 6);

auto wcErr = writer.close();
ASSERT_FALSE(cErr) << cErr.message();
auto wsErr = streamer.close();
ASSERT_FALSE(wsErr) << wsErr.message();
}
expected = {1.0, 8.0};
test_downsample(data, expected, 7);

///@brief test streamer set channels after construction.
TEST(FramerTests, testStreamSetChannels) {
auto client = new_test_client();
auto [data, cErr] = client.channels.create(
"data",
synnax::FLOAT32,
1 * synnax::HZ);
ASSERT_FALSE(cErr) << cErr.message();
auto now = synnax::TimeStamp::now();
auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
{data.key},
now,
std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
synnax::ControlSubject{"test_writer"}
});
ASSERT_FALSE(wErr) << wErr.message();
expected = {1.0, 9.0};
test_downsample(data, expected, 8);

auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
{},
});
expected = {1.0, 10.0};
test_downsample(data, expected, 9);

auto setErr = streamer.setChannels({data.key});
// Sleep for 5 milliseconds to allow for the streamer to process the updated keys.
std::this_thread::sleep_for(std::chrono::milliseconds(5));
ASSERT_FALSE(setErr) << setErr.message();
expected = {1.0};
test_downsample(data, expected, 10);

auto frame = synnax::Frame(1);
frame.add(
data.key,
synnax::Series(std::vector<std::float_t>{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}));
ASSERT_TRUE(writer.write(std::move(frame)));
auto [res_frame, recErr] = streamer.read();
ASSERT_FALSE(recErr) << recErr.message();

ASSERT_EQ(res_frame.size(), 1);
ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);
test_downsample(data, data,-1);

auto wcErr = writer.close();
ASSERT_FALSE(cErr) << cErr.message();
auto wsErr = streamer.close();
ASSERT_FALSE(wsErr) << wsErr.message();
test_downsample(data, data,0);
}

/// @brief it should correctly receive a frame of streamed telemetry from the DB.
TEST(FramerTests, TestStreamDownsample) {
void test_downsample(
std::vector<float> raw_data,
std::vector<float> expected,
int32_t downsample_factor
) {
auto client = new_test_client();
auto [data, cErr] = client.channels.create(
"data",
Expand All @@ -118,6 +212,7 @@ TEST(FramerTests, TestStreamDownsample) {

auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
channels,
downsample_factor
});

// Sleep for 5 milliseconds to allow for the streamer to bootstrap.
Expand All @@ -126,13 +221,14 @@ TEST(FramerTests, TestStreamDownsample) {
auto frame = synnax::Frame(1);
frame.add(
data.key,
synnax::Series(std::vector<std::float_t>{1.0}));
synnax::Series(raw_data)
);
ASSERT_TRUE(writer.write(std::move(frame)));
auto [res_frame, recErr] = streamer.read();
ASSERT_FALSE(recErr) << recErr.message();

ASSERT_EQ(res_frame.size(), 1);
ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);
for (int i = 0; i < expected.size(); i++)
ASSERT_EQ(res_frame.series->at(0).values<float>()[i], expected[i]);

auto wcErr = writer.close();
ASSERT_FALSE(cErr) << cErr.message();
Expand Down

0 comments on commit 0651887

Please sign in to comment.