Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/interpreter/user defined value condition #1338

Merged
merged 9 commits into from
Aug 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,17 @@ class FieldOperatorApplicationFor<AutowareUniverse>
CONCEALER_PUBLIC explicit FieldOperatorApplicationFor(Ts &&... xs)
: FieldOperatorApplication(std::forward<decltype(xs)>(xs)...),
// clang-format off
getAckermannControlCommand("/control/command/control_cmd", *this),
getAutowareState("/api/iv_msgs/autoware/state", *this),
getCooperateStatusArray("/api/external/get/rtc_status", *this, [this](const auto & v) { latest_cooperate_status_array = v; }),
getEmergencyState("/api/external/get/emergency", *this, [this](const auto & v) { receiveEmergencyState(v); }),
getAckermannControlCommand("/control/command/control_cmd", rclcpp::QoS(1), *this),
getAutowareState("/api/iv_msgs/autoware/state", rclcpp::QoS(1), *this),
getCooperateStatusArray("/api/external/get/rtc_status", rclcpp::QoS(1), *this, [this](const auto & v) { latest_cooperate_status_array = v; }),
getEmergencyState("/api/external/get/emergency", rclcpp::QoS(1), *this, [this](const auto & v) { receiveEmergencyState(v); }),
#if __has_include(<autoware_adapi_v1_msgs/msg/localization_initialization_state.hpp>)
getLocalizationState("/api/localization/initialization_state", *this),
getLocalizationState("/api/localization/initialization_state", rclcpp::QoS(1).transient_local(), *this),
#endif
getMrmState("/api/fail_safe/mrm_state", *this, [this](const auto & v) { receiveMrmState(v); }),
getPathWithLaneId("/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id", *this),
getTrajectory("/api/iv_msgs/planning/scenario_planning/trajectory", *this),
getTurnIndicatorsCommandImpl("/control/command/turn_indicators_cmd", *this),
getMrmState("/api/fail_safe/mrm_state", rclcpp::QoS(1), *this, [this](const auto & v) { receiveMrmState(v); }),
getPathWithLaneId("/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id", rclcpp::QoS(1), *this),
getTrajectory("/api/iv_msgs/planning/scenario_planning/trajectory", rclcpp::QoS(1), *this),
getTurnIndicatorsCommandImpl("/control/command/turn_indicators_cmd", rclcpp::QoS(1), *this),
requestClearRoute("/api/routing/clear_route", *this),
requestCooperateCommands("/api/external/set/rtc_commands", *this),
requestEngage("/api/external/set/engage", *this),
Expand Down
6 changes: 4 additions & 2 deletions external/concealer/include/concealer/subscriber_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ class SubscriberWrapper

template <typename NodeInterface>
SubscriberWrapper(
const std::string & topic, NodeInterface & autoware_interface,
const std::string & topic, const rclcpp::QoS & quality_of_service,
NodeInterface & autoware_interface,
const std::function<void(const MessageType &)> & callback = {})
: subscription(autoware_interface.template create_subscription<MessageType>(
topic, 1, [this, callback](const typename MessageType::ConstSharedPtr message) {
topic, quality_of_service,
[this, callback](const typename MessageType::ConstSharedPtr message) {
if constexpr (thread_safety == ThreadSafety::safe) {
std::atomic_store(&current_value, message);
if (current_value and callback) {
Expand Down
9 changes: 5 additions & 4 deletions external/concealer/src/autoware_universe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
namespace concealer
{
AutowareUniverse::AutowareUniverse()
: getAckermannControlCommand("/control/command/control_cmd", *this),
getGearCommandImpl("/control/command/gear_cmd", *this),
getTurnIndicatorsCommand("/control/command/turn_indicators_cmd", *this),
: getAckermannControlCommand("/control/command/control_cmd", rclcpp::QoS(1), *this),
getGearCommandImpl("/control/command/gear_cmd", rclcpp::QoS(1), *this),
getTurnIndicatorsCommand("/control/command/turn_indicators_cmd", rclcpp::QoS(1), *this),
getPathWithLaneId(
"/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id", *this),
"/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id", rclcpp::QoS(1),
*this),
setAcceleration("/localization/acceleration", *this),
setOdometry("/localization/kinematic_state", *this),
setSteeringReport("/vehicle/status/steering_status", *this),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ class UserDefinedValueCondition : private SimulatorCore::NonStandardOperation

std::function<Object()> evaluate_value;

static uint64_t magic_subscription_counter;

public:
const String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,47 +33,77 @@ namespace openscenario_interpreter
inline namespace syntax
{
template <typename T>
struct MagicSubscription : private rclcpp::Node, public T
struct MagicSubscription : private T
{
std::promise<void> promise;
struct Node : public rclcpp::Node
{
std::atomic_bool stop_requested = false;

std::thread thread;
std::mutex mutex;

std::exception_ptr thrown;
std::exception_ptr thrown;

typename rclcpp::Subscription<T>::SharedPtr subscription;
std::thread thread;

public:
explicit MagicSubscription(const std::string & node_name, const std::string & topic_name)
: rclcpp::Node(node_name),
thread(
[this](auto future) {
while (rclcpp::ok() and
future.wait_for(std::chrono::milliseconds(1)) == std::future_status::timeout) {
explicit Node()
: rclcpp::Node("magic_subscription"), thread([this]() {
while (rclcpp::ok() and not stop_requested) {
try {
rclcpp::spin_some(get_node_base_interface());
} catch (...) {
auto lock = std::lock_guard(mutex);
thrown = std::current_exception();
}
}
},
std::move(promise.get_future())),
subscription(create_subscription<T>(topic_name, 1, [this](const typename T::SharedPtr message) {
static_cast<T &>(*this) = *message;
}))
})
{
}

~Node()
{
if (thread.joinable() and not stop_requested.exchange(true)) {
thread.join();
}
}

auto rethrow()
{
if (auto lock = std::lock_guard(mutex); thrown) {
std::rethrow_exception(thrown);
}
}
};

static inline std::size_t count = 0;

static inline std::unique_ptr<Node> node = nullptr;

typename rclcpp::Subscription<T>::SharedPtr subscription;

explicit MagicSubscription(const std::string & topic_name)
{
if (not count++) {
node = std::make_unique<Node>();
}

subscription = node->template create_subscription<T>(
topic_name, rclcpp::QoS(1).best_effort(),
[this](const typename T::SharedPtr message) { static_cast<T &>(*this) = *message; });
}

~MagicSubscription()
{
if (thread.joinable()) {
promise.set_value();
thread.join();
if (not --count) {
node.reset();
}
}
};

uint64_t UserDefinedValueCondition::magic_subscription_counter = 0;
auto get() const -> const auto &
{
node->rethrow();
return static_cast<const T &>(*this);
}
};

UserDefinedValueCondition::UserDefinedValueCondition(const pugi::xml_node & node, Scope & scope)
: name(readAttribute<String>("name", node, scope)),
Expand Down Expand Up @@ -134,31 +164,29 @@ UserDefinedValueCondition::UserDefinedValueCondition(const pugi::xml_node & node
using tier4_simulation_msgs::msg::UserDefinedValueType;

evaluate_value =
[&, current_message = std::make_shared<MagicSubscription<UserDefinedValue>>(
result.str(1) + "_subscription_" + std::to_string(++magic_subscription_counter),
result.str(0))]() {
auto evaluate = [](const auto & user_defined_value) {
switch (user_defined_value.type.data) {
[this, subscriber = std::make_shared<MagicSubscription<UserDefinedValue>>(result.str(0))]() {
if (const auto message = subscriber->get(); message.value.empty()) {
return unspecified;
} else {
switch (message.type.data) {
case UserDefinedValueType::BOOLEAN:
return make<Boolean>(user_defined_value.value);
return make<Boolean>(message.value);
case UserDefinedValueType::DATE_TIME:
return make<String>(user_defined_value.value);
return make<String>(message.value);
case UserDefinedValueType::DOUBLE:
return make<Double>(user_defined_value.value);
return make<Double>(message.value);
case UserDefinedValueType::INTEGER:
return make<Integer>(user_defined_value.value);
return make<Integer>(message.value);
case UserDefinedValueType::STRING:
return make<String>(user_defined_value.value);
return make<String>(message.value);
case UserDefinedValueType::UNSIGNED_INT:
return make<UnsignedInt>(user_defined_value.value);
return make<UnsignedInt>(message.value);
case UserDefinedValueType::UNSIGNED_SHORT:
return make<UnsignedShort>(user_defined_value.value);
return make<UnsignedShort>(message.value);
default:
return unspecified;
}
};

return not current_message->value.empty() ? evaluate(*current_message) : unspecified;
}
};
#else
throw SyntaxError(
Expand Down
Loading
Loading