Skip to content

Commit

Permalink
Support AMQP filter expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Oct 2, 2024
1 parent 3f9a57e commit ee2e325
Show file tree
Hide file tree
Showing 24 changed files with 1,079 additions and 304 deletions.
8 changes: 3 additions & 5 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -737,15 +737,13 @@ translate_terminus_durability(configuration) -> 1;
translate_terminus_durability(unsettled_state) -> 2.

translate_filters(Filters)
when is_map(Filters) andalso
map_size(Filters) == 0 ->
when map_size(Filters) =:= 0 ->
undefined;
translate_filters(Filters)
when is_map(Filters) ->
translate_filters(Filters) ->
{map,
maps:fold(
fun
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
%% special case conversion
Key = sym(K),
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
Expand Down
5 changes: 4 additions & 1 deletion deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,10 @@ wrap_ap_value(V) when is_integer(V) ->
case V < 0 of
true -> {int, V};
false -> {uint, V}
end.
end;
wrap_ap_value(V) when is_number(V) ->
%% AMQP double and Erlang float are both 64-bit.
{double, V}.

%% LOCAL
header_value(durable, undefined) -> false;
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_common/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def all_srcs(name = "all_srcs"):
)
filegroup(
name = "public_hdrs",
srcs = ["include/amqp10_framing.hrl", "include/amqp10_types.hrl"],
srcs = ["include/amqp10_filtex.hrl", "include/amqp10_framing.hrl", "include/amqp10_types.hrl"],
)
filegroup(
name = "private_hdrs",
Expand Down
15 changes: 15 additions & 0 deletions deps/amqp10_common/include/amqp10_filtex.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.


%% AMQP Filter Expressions Version 1.0 Working Draft 09
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227

-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).

-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).
16 changes: 16 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,7 @@ rabbitmq_integration_suite(
name = "amqp_client_SUITE",
size = "large",
additional_beam = [
":test_amqp_utils_beam",
":test_event_recorder_beam",
],
shard_count = 3,
Expand All @@ -1215,6 +1216,16 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "amqp_filtex_SUITE",
additional_beam = [
":test_amqp_utils_beam",
],
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

rabbitmq_integration_suite(
name = "amqp_proxy_protocol_SUITE",
size = "medium",
Expand All @@ -1235,6 +1246,7 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "amqp_auth_SUITE",
additional_beam = [
":test_amqp_utils_beam",
":test_event_recorder_beam",
],
shard_count = 2,
Expand All @@ -1246,6 +1258,9 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "amqp_address_SUITE",
shard_count = 2,
additional_beam = [
":test_amqp_utils_beam",
],
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
Expand Down Expand Up @@ -1358,6 +1373,7 @@ eunit(
":test_clustering_utils_beam",
":test_event_recorder_beam",
":test_rabbit_ct_hook_beam",
":test_amqp_utils_beam",
],
target = ":test_erlang_app",
test_env = {
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ define ct_master.erl
endef

PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit

Expand Down
20 changes: 20 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_filtex.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
Expand Down Expand Up @@ -302,6 +303,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_filtex.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
Expand Down Expand Up @@ -578,6 +580,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_filtex.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
Expand Down Expand Up @@ -2195,3 +2198,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "amqp_filtex_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_filtex_SUITE.erl"],
outs = ["test/amqp_filtex_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app"],
)
erlang_bytecode(
name = "test_amqp_utils_beam",
testonly = True,
srcs = ["test/amqp_utils.erl"],
outs = ["test/amqp_utils.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
1 change: 1 addition & 0 deletions deps/rabbit/ct.test.spec
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
, amqp_auth_SUITE
, amqp_client_SUITE
, amqp_credit_api_v2_SUITE
, amqp_filtex_SUITE
, amqp_proxy_protocol_SUITE
, amqp_system_SUITE
, amqpl_consumer_ack_SUITE
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ message_id(BasicMsg) ->
mc_compat:message_id(BasicMsg).

-spec property(atom(), state()) ->
{utf8, binary()} | undefined.
tagged_value().
property(Property, #?MODULE{protocol = Proto,
data = Data}) ->
Proto:property(Property, Data);
Expand Down
42 changes: 29 additions & 13 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

-define(MESSAGE_ANNOTATIONS_GUESS_SIZE, 100).

-define(SIMPLE_VALUE(V),
-define(IS_SIMPLE_VALUE(V),
is_binary(V) orelse
is_number(V) orelse
is_boolean(V)).
Expand Down Expand Up @@ -145,16 +145,32 @@ property(Prop, #v1{bare_and_footer = Bin,
Props = amqp10_framing:decode(PropsDescribed),
property0(Prop, Props).

property0(correlation_id, #'v1_0.properties'{correlation_id = Corr}) ->
Corr;
property0(message_id, #'v1_0.properties'{message_id = MsgId}) ->
MsgId;
property0(user_id, #'v1_0.properties'{user_id = UserId}) ->
UserId;
property0(subject, #'v1_0.properties'{subject = Subject}) ->
Subject;
property0(to, #'v1_0.properties'{to = To}) ->
To;
property0(message_id, #'v1_0.properties'{message_id = Val}) ->
Val;
property0(user_id, #'v1_0.properties'{user_id = Val}) ->
Val;
property0(to, #'v1_0.properties'{to = Val}) ->
Val;
property0(subject, #'v1_0.properties'{subject = Val}) ->
Val;
property0(reply_to, #'v1_0.properties'{reply_to = Val}) ->
Val;
property0(correlation_id, #'v1_0.properties'{correlation_id = Val}) ->
Val;
property0(content_type, #'v1_0.properties'{content_type = Val}) ->
Val;
property0(content_encoding, #'v1_0.properties'{content_encoding = Val}) ->
Val;
property0(absolute_expiry_time, #'v1_0.properties'{absolute_expiry_time = Val}) ->
Val;
property0(creation_time, #'v1_0.properties'{creation_time = Val}) ->
Val;
property0(group_id, #'v1_0.properties'{group_id = Val}) ->
Val;
property0(group_sequence, #'v1_0.properties'{group_sequence = Val}) ->
Val;
property0(reply_to_group_id, #'v1_0.properties'{reply_to_group_id = Val}) ->
Val;
property0(_Prop, #'v1_0.properties'{}) ->
undefined.

Expand Down Expand Up @@ -454,7 +470,7 @@ message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content) ->
%% the section record format really is terrible
lists:filtermap(fun({{symbol, K}, {_T, V}})
when ?SIMPLE_VALUE(V) ->
when ?IS_SIMPLE_VALUE(V) ->
{true, {K, V}};
(_) ->
false
Expand All @@ -480,7 +496,7 @@ application_properties_as_simple_map(
application_properties_as_simple_map0(Content, L) ->
%% the section record format really is terrible
lists:foldl(fun({{utf8, K}, {_T, V}}, Acc)
when ?SIMPLE_VALUE(V) ->
when ?IS_SIMPLE_VALUE(V) ->
[{K, V} | Acc];
({{utf8, K}, V}, Acc)
when V =:= undefined orelse is_boolean(V) ->
Expand Down
149 changes: 149 additions & 0 deletions deps/rabbit/src/rabbit_amqp_filtex.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

%% AMQP Filter Expressions Version 1.0 Working Draft 09
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
-module(rabbit_amqp_filtex).

-include_lib("amqp10_common/include/amqp10_filtex.hrl").

-export([validate/1,
filter/2]).

-type filter_expression() :: {properties, [{atom(), term()}]} |
{application_properties, [{binary(), term()}]}.
-type filter_expressions() :: [filter_expression()].
-export_type([filter_expressions/0]).

-spec validate(tuple()) ->
{ok, filter_expression()} | error.
validate({described, Descriptor, {map, KVList}}) ->
validate0(Descriptor, KVList);
validate(_) ->
error.

-spec filter(filter_expressions(), mc:state()) ->
boolean().
filter(Filters, Mc) ->
%% "A message will pass through a filter-set if and only if
%% it passes through each of the named filters." [3.5.8]
lists:all(fun(Filter) ->
filter0(Filter, Mc)
end, Filters).

%%%%%%%%%%%%%%%%
%%% Internal %%%
%%%%%%%%%%%%%%%%

filter0({properties, KVList}, Mc) ->
%% "The filter evaluates to true if all properties enclosed in the filter expression
%% match the respective properties in the message."
%% [filtex-v1.0-wd09 4.2.4]
lists:all(fun({FieldName, RefVal}) ->
TaggedVal = mc:property(FieldName, Mc),
Val = unwrap(TaggedVal),
match_simple_type(RefVal, Val)
end, KVList);
filter0({application_properties, KVList}, Mc) ->
AppProps = mc:routing_headers(Mc, []),
%% "The filter evaluates to true if all properties enclosed in the filter expression
%% match the respective entries in the application-properties section in the message."
%% [filtex-v1.0-wd09 4.2.5]
lists:all(fun({Key, RefVal}) ->
case AppProps of
#{Key := Val} ->
match_simple_type(RefVal, Val);
_ ->
false
end
end, KVList).

%% [filtex-v1.0-wd09 4.1.1]
%% "A reference field value in a property filter expression matches
%% its corresponding message metadata field value if:
%% [...]
match_simple_type(null, _Val) ->
%% * The reference field value is NULL
true;
match_simple_type(RefVal, Val) ->
%% * the reference field value is of a floating-point or integer number type
%% and the message metadata field is of a different floating-point or integer number type,
%% the reference value and the metadata field value are within the value range of both types,
%% and the values are equal when treated as a floating-point"
RefVal == Val.

validate0(Descriptor, KVList) when
(Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER}) andalso
KVList =/= [] ->
validate_props(KVList, []);
validate0(Descriptor, KVList0) when
(Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER}) andalso
KVList0 =/= [] ->
KVList = lists:map(fun({{utf8, Key}, TaggedVal}) ->
{Key, unwrap(TaggedVal)}
end, KVList0),
{ok, {application_properties, KVList}};
validate0(_, _) ->
error.

validate_props([], Acc) ->
{ok, {properties, lists:reverse(Acc)}};
validate_props([{{symbol, <<"message-id">>}, {Type, Val}} | Rest], Acc) ->
case validate_message_id_type(Type) of
ok ->
validate_props(Rest, [{message_id, Val} | Acc]);
error ->
error
end;
validate_props([{{symbol, <<"user-id">>}, {binary, Val}} | Rest], Acc) ->
validate_props(Rest, [{user_id, Val} | Acc]);
validate_props([{{symbol, <<"to">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{to, Val} | Acc]);
validate_props([{{symbol, <<"subject">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{subject, Val} | Acc]);
validate_props([{{symbol, <<"reply-to">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{reply_to, Val} | Acc]);
validate_props([{{symbol, <<"correlation-id">>}, {Type, Val}} | Rest], Acc) ->
case validate_message_id_type(Type) of
ok ->
validate_props(Rest, [{correlation_id, Val} | Acc]);
error ->
error
end;
validate_props([{{symbol, <<"content-type">>}, {symbol, Val}} | Rest], Acc) ->
validate_props(Rest, [{content_type, Val} | Acc]);
validate_props([{{symbol, <<"content-encoding">>}, {symbol, Val}} | Rest], Acc) ->
validate_props(Rest, [{content_encoding, Val} | Acc]);
validate_props([{{symbol, <<"absolute-expiry-time">>}, {timestamp, Val}} | Rest], Acc) ->
validate_props(Rest, [{absolute_expiry_time, Val} | Acc]);
validate_props([{{symbol, <<"creation-time">>}, {timestamp, Val}} | Rest], Acc) ->
validate_props(Rest, [{creation_time, Val} | Acc]);
validate_props([{{symbol, <<"group-id">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{group_id, Val} | Acc]);
validate_props([{{symbol, <<"group-sequence">>}, {uint, Val}} | Rest], Acc) ->
validate_props(Rest, [{group_sequence, Val} | Acc]);
validate_props([{{symbol, <<"reply-to-group-id">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{reply_to_group_id, Val} | Acc]);
validate_props(_, _) ->
error.

validate_message_id_type(ulong) ->
ok;
validate_message_id_type(uuid) ->
ok;
validate_message_id_type(binary) ->
ok;
validate_message_id_type(utf8) ->
ok;
validate_message_id_type(_) ->
error.

unwrap({_Tag, V}) ->
V;
unwrap(V) ->
V.
Loading

0 comments on commit ee2e325

Please sign in to comment.