diff --git a/lib/core/mqtt_topics.hpp b/lib/core/mqtt_topics.hpp index f959a29d..435095c2 100644 --- a/lib/core/mqtt_topics.hpp +++ b/lib/core/mqtt_topics.hpp @@ -15,7 +15,9 @@ enum class MqttTopic { kDisplacement, kVelocity, kAcceleration, - kLogs + kLogs, + kLatencyRequest, + kLatencyResponse }; const std::unordered_map mqtt_topic_to_string @@ -27,7 +29,9 @@ const std::unordered_map mqtt_topic_to_string {MqttTopic::kDisplacement, "hyped/cart_2024/navigation/displacement"}, {MqttTopic::kVelocity, "hyped/cart_2024/navigation/velocity"}, {MqttTopic::kAcceleration, "hyped/cart_2024/navigation/acceleration"}, - {MqttTopic::kLogs, "hyped/cart_2024/logs"}}; + {MqttTopic::kLogs, "hyped/cart_2024/logs"}, + {MqttTopic::kLatencyRequest, "hyped/cart_2024/latency/request"}, + {MqttTopic::kLatencyResponse, "hyped/cart_2024/latency/response"}}; const std::unordered_map mqtt_string_to_topic = {{"hyped/cart_2024/state/state", MqttTopic::kState}, @@ -38,6 +42,8 @@ const std::unordered_map mqtt_string_to_topic {"hyped/cart_2024/navigation/displacement", MqttTopic::kDisplacement}, {"hyped/cart_2024/navigation/velocity", MqttTopic::kVelocity}, {"hyped/cart_2024/navigation/acceleration", MqttTopic::kAcceleration}, - {"hyped/cart_2024/logs", MqttTopic::kLogs}}; + {"hyped/cart_2024/logs", MqttTopic::kLogs}, + {"hyped/cart_2024/latency/request", MqttTopic::kLatencyRequest}, + {"hyped/cart_2024/latency/response", MqttTopic::kLatencyResponse}}; } // namespace hyped::core diff --git a/lib/telemetry/latency.cpp b/lib/telemetry/latency.cpp new file mode 100644 index 00000000..af88489a --- /dev/null +++ b/lib/telemetry/latency.cpp @@ -0,0 +1,46 @@ +#include "latency.hpp" + +#include + +namespace hyped::telemetry { + +Latency::Latency(std::shared_ptr mqtt) +{ + mqtt_->subscribe(core::MqttTopic::kStateRequest); +} + +void Latency::respond() +{ + const auto nextMessage = mqtt_->getMessage(); + if (!nextMessage) { return; } + + const auto payload = nextMessage->payload; + const auto topic = core::MqttTopic::kLatencyResponse; + const core::MqttMessage::Header header{.timestamp = 0, + .priority = core::MqttMessagePriority::kNormal}; + const core::MqttMessage message{topic, header, payload}; + mqtt_->publish(message, core::MqttMessageQos::kAtLeastOnce); +} + +void Latency::run() +{ + mqtt_->consume(); + respond(); +} + +core::Result Latency::startNode(const std::string &mqtt_ip, const std::uint32_t mqtt_port) +{ + core::WallClock wall_clock; + core::Logger logger("LATENCY", core::LogLevel::kDebug, wall_clock); + auto optional_mqtt = core::Mqtt::create(logger, "latency", mqtt_ip, mqtt_port); + if (!optional_mqtt) { + logger.log(core::LogLevel::kFatal, "Failed to create MQTT client"); + return core::Result::kFailure; + } + auto mqtt = *optional_mqtt; + Latency latency(mqtt); + latency.run(); + return core::Result::kSuccess; +} + +} // namespace hyped::telemetry diff --git a/lib/telemetry/latency.hpp b/lib/telemetry/latency.hpp new file mode 100644 index 00000000..4a085269 --- /dev/null +++ b/lib/telemetry/latency.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +namespace hyped::telemetry { + +class Latency { + public: + Latency(std::shared_ptr mqtt); + void respond(); + void run(); + static core::Result startNode(const std::string &mqtt_ip, const std::uint32_t mqtt_port); + + const std::shared_ptr mqtt_; +}; + +} // namespace hyped::telemetry diff --git a/telemetry/packages/ui/app/context/pods.tsx b/telemetry/packages/ui/app/context/pods.tsx index 8545fa53..c462f828 100644 --- a/telemetry/packages/ui/app/context/pods.tsx +++ b/telemetry/packages/ui/app/context/pods.tsx @@ -152,7 +152,7 @@ export const PodsProvider = ({ children }: { children: React.ReactNode }) => { publish( 'latency/request', JSON.stringify({ - latency: new Date().getTime().toString(), + timestamp: new Date().getTime().toString(), }), podId, ); @@ -233,7 +233,7 @@ export const PodsProvider = ({ children }: { children: React.ReactNode }) => { // calculate the latency const latency = new Date().getTime() - - parseInt(JSON.parse(message.toString())['latency'] as string); + parseInt(JSON.parse(message.toString())['timestamp'] as string); // send warning to the server if the latency is too high if (latency > POD_WARNING_LATENCY) {