Skip to content

Commit

Permalink
remove unneccessary logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Lham42 committed Sep 24, 2024
1 parent 0651887 commit 3fa6754
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 138 deletions.
2 changes: 1 addition & 1 deletion client/cpp/framer/framer.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class StreamerConfig {
public:
/// @brief the channels to stream.
std::vector<ChannelKey> channels;
int downsample_factor = 2;
int downsample_factor = 1;
private:
void toProto(api::v1::FrameStreamerRequest &f) const;

Expand Down
1 change: 0 additions & 1 deletion client/cpp/framer/streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ using namespace synnax;

void StreamerConfig::toProto(api::v1::FrameStreamerRequest &f) const {
f.mutable_keys()->Add(channels.begin(), channels.end());
std::cout << "downsample_factor: " << downsample_factor << std::endl;
f.set_downsample_factor(downsample_factor);
}

Expand Down
224 changes: 88 additions & 136 deletions client/cpp/framer/streamer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,145 +18,97 @@ void test_downsample(
std::vector<float> expected,
int32_t downsample_factor
);
// /// @brief it should correctly receive a frame of streamed telemetry from the DB.
// TEST(FramerTests, testStreamBasic) {
// auto client = new_test_client();
// auto [data, cErr] = client.channels.create(
// "data",
// synnax::FLOAT32,
// 1 * synnax::HZ);
// ASSERT_FALSE(cErr) << cErr.message();
// auto now = synnax::TimeStamp::now();
// std::vector<synnax::ChannelKey> channels = {data.key};
// auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
// channels,
// now,
// std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
// synnax::ControlSubject{"test_writer"}
// });
// ASSERT_FALSE(wErr) << wErr.message();
//
// auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
// channels,
// });
//
// // Sleep for 5 milliseconds to allow for the streamer to bootstrap.
// std::this_thread::sleep_for(std::chrono::milliseconds(5));
//
// auto frame = synnax::Frame(1);
// frame.add(
// data.key,
// synnax::Series(std::vector<std::float_t>{1.0}));
// ASSERT_TRUE(writer.write(std::move(frame)));
// auto [res_frame, recErr] = streamer.read();
// ASSERT_FALSE(recErr) << recErr.message();
//
// ASSERT_EQ(res_frame.size(), 1);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);
//
// auto wcErr = writer.close();
// ASSERT_FALSE(cErr) << cErr.message();
// auto wsErr = streamer.close();
// ASSERT_FALSE(wsErr) << wsErr.message();
// }
//
// ///@brief test streamer set channels after construction.
// TEST(FramerTests, testStreamSetChannels) {
// auto client = new_test_client();
// auto [data, cErr] = client.channels.create(
// "data",
// synnax::FLOAT32,
// 1 * synnax::HZ);
// ASSERT_FALSE(cErr) << cErr.message();
// auto now = synnax::TimeStamp::now();
// auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
// {data.key},
// now,
// std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
// synnax::ControlSubject{"test_writer"}
// });
// ASSERT_FALSE(wErr) << wErr.message();
//
// auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
// {},
// });
//
// auto setErr = streamer.setChannels({data.key});
// // Sleep for 5 milliseconds to allow for the streamer to process the updated keys.
// std::this_thread::sleep_for(std::chrono::milliseconds(5));
// ASSERT_FALSE(setErr) << setErr.message();
//
// auto frame = synnax::Frame(1);
// frame.add(
// data.key,
// synnax::Series(std::vector<std::float_t>{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}));
// ASSERT_TRUE(writer.write(std::move(frame)));
// auto [res_frame, recErr] = streamer.read();
// ASSERT_FALSE(recErr) << recErr.message();
//
// ASSERT_EQ(res_frame.size(), 1);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);
//
// auto wcErr = writer.close();
// ASSERT_FALSE(cErr) << cErr.message();
// auto wsErr = streamer.close();
// ASSERT_FALSE(wsErr) << wsErr.message();
// }
//
// /// @brief it should correctly receive a frame of streamed telemetry from the DB.
// TEST(FramerTests, TestStreamDownsample) {
// auto client = new_test_client();
// auto [data, cErr] = client.channels.create(
// "data",
// synnax::FLOAT32,
// 1 * synnax::HZ);
// ASSERT_FALSE(cErr) << cErr.message();
// auto now = synnax::TimeStamp::now();
// std::vector<synnax::ChannelKey> channels = {data.key};
// auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
// channels,
// now,
// std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
// synnax::ControlSubject{"test_writer"}
// });
// ASSERT_FALSE(wErr) << wErr.message();
//
// auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
// channels,
// });
//
// // Sleep for 5 milliseconds to allow for the streamer to bootstrap.
// std::this_thread::sleep_for(std::chrono::milliseconds(5));
//
// auto frame = synnax::Frame(1);
// frame.add(
// data.key,
// synnax::Series(std::vector<std::float_t>{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}));
// ASSERT_TRUE(writer.write(std::move(frame)));
// auto [res_frame, recErr] = streamer.read();
// ASSERT_FALSE(recErr) << recErr.message();
//
// // print series
// for (auto &series : *res_frame.series) {
// std::cout << "Series size: " << series.values<float>().size() << "\n";
// std::cout << "Series: " << series;
// }
//
// ASSERT_EQ(res_frame.series->at(0).values<float>().size(), 5);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[1], 3.0);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[2], 5.0);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[3], 7.0);
// ASSERT_EQ(res_frame.series->at(0).values<float>()[4], 9.0);
//
// auto wcErr = writer.close();
// ASSERT_FALSE(cErr) << cErr.message();
// auto wsErr = streamer.close();
// ASSERT_FALSE(wsErr) << wsErr.message();
// }
/// @brief it should correctly receive a frame of streamed telemetry from the DB.
TEST(FramerTests, testStreamBasic) {
auto client = new_test_client();
auto [data, cErr] = client.channels.create(
"data",
synnax::FLOAT32,
1 * synnax::HZ);
ASSERT_FALSE(cErr) << cErr.message();
auto now = synnax::TimeStamp::now();
std::vector<synnax::ChannelKey> channels = {data.key};
auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
channels,
now,
std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
synnax::ControlSubject{"test_writer"}
});
ASSERT_FALSE(wErr) << wErr.message();

auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
channels,
});

// Sleep for 5 milliseconds to allow for the streamer to bootstrap.
std::this_thread::sleep_for(std::chrono::milliseconds(5));

auto frame = synnax::Frame(1);
frame.add(
data.key,
synnax::Series(std::vector<std::float_t>{1.0}));
ASSERT_TRUE(writer.write(std::move(frame)));
auto [res_frame, recErr] = streamer.read();
ASSERT_FALSE(recErr) << recErr.message();

ASSERT_EQ(res_frame.size(), 1);
ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);

auto wcErr = writer.close();
ASSERT_FALSE(cErr) << cErr.message();
auto wsErr = streamer.close();
ASSERT_FALSE(wsErr) << wsErr.message();
}

///@brief test streamer set channels after construction.
TEST(FramerTests, testStreamSetChannels) {
auto client = new_test_client();
auto [data, cErr] = client.channels.create(
"data",
synnax::FLOAT32,
1 * synnax::HZ);
ASSERT_FALSE(cErr) << cErr.message();
auto now = synnax::TimeStamp::now();
auto [writer, wErr] = client.telem.openWriter(synnax::WriterConfig{
{data.key},
now,
std::vector<synnax::Authority>{synnax::AUTH_ABSOLUTE},
synnax::ControlSubject{"test_writer"}
});
ASSERT_FALSE(wErr) << wErr.message();

auto [streamer, sErr] = client.telem.openStreamer(synnax::StreamerConfig{
{},
});

auto setErr = streamer.setChannels({data.key});
// Sleep for 5 milliseconds to allow for the streamer to process the updated keys.
std::this_thread::sleep_for(std::chrono::milliseconds(5));
ASSERT_FALSE(setErr) << setErr.message();

auto frame = synnax::Frame(1);
frame.add(
data.key,
synnax::Series(std::vector<std::float_t>{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}));
ASSERT_TRUE(writer.write(std::move(frame)));
auto [res_frame, recErr] = streamer.read();
ASSERT_FALSE(recErr) << recErr.message();

ASSERT_EQ(res_frame.size(), 1);
ASSERT_EQ(res_frame.series->at(0).values<float>()[0], 1.0);

auto wcErr = writer.close();
ASSERT_FALSE(cErr) << cErr.message();
auto wsErr = streamer.close();
ASSERT_FALSE(wsErr) << wsErr.message();
}

/// @brief it should correctly receive a frame of streamed telemetry from the DB.
TEST(FramerTests, TestStreamDownsample) {
std::vector<float> data = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};

test_downsample(data,data,1);

std::vector<float> expected = {1.0, 3.0, 5.0, 7.0, 9.0};
test_downsample(data, expected, 2);

Expand Down

0 comments on commit 3fa6754

Please sign in to comment.