Skip to content

Commit

Permalink
Merge pull request #12503 from rabbitmq/rabbitmq-server-12499-check-s…
Browse files Browse the repository at this point in the history
…tream-publisher-reference-length

Return error if stream publisher/consumer reference is longer than 255 characters
  • Loading branch information
michaelklishin authored Oct 11, 2024
2 parents 1726064 + 622dec0 commit 2acc329
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 3 deletions.
44 changes: 42 additions & 2 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
peer_cert_validity]).
-define(UNKNOWN_FIELD, unknown_field).
-define(SILENT_CLOSE_DELAY, 3_000).
-define(MAX_REFERENCE_SIZE, 255).

-import(rabbit_stream_utils, [check_write_permitted/2,
check_read_permitted/3]).

%% client API
-export([start_link/4,
Expand Down Expand Up @@ -1655,6 +1659,26 @@ handle_frame_post_auth(Transport,
{C1#stream_connection{connection_step = failure}, S1}
end,
{Connection1, State1};
handle_frame_post_auth(Transport,
#stream_connection{user = User,
resource_alarm = false} = C,
State,
{request, CorrelationId,
{declare_publisher, _PublisherId, WriterRef, S}})
when is_binary(WriterRef), byte_size(WriterRef) > ?MAX_REFERENCE_SIZE ->
{Code, Counter} = case check_write_permitted(stream_r(S, C), User) of
ok ->
{?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED};
error ->
{?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED}
end,
response(Transport,
C,
declare_publisher,
CorrelationId,
Code),
rabbit_global_counters:increase_protocol_counter(stream, Counter, 1),
{C, State};
handle_frame_post_auth(Transport,
#stream_connection{user = User,
publishers = Publishers0,
Expand All @@ -1664,7 +1688,7 @@ handle_frame_post_auth(Transport,
State,
{request, CorrelationId,
{declare_publisher, PublisherId, WriterRef, Stream}}) ->
case rabbit_stream_utils:check_write_permitted(stream_r(Stream,
case check_write_permitted(stream_r(Stream,
Connection0),
User)
of
Expand Down Expand Up @@ -1895,6 +1919,19 @@ handle_frame_post_auth(Transport, #stream_connection{} = Connection, State,
{subscribe,
_, _, _, _, _}} = Request) ->
handle_frame_post_auth(Transport, {ok, Connection}, State, Request);
handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, State,
{request, CorrelationId,
{subscribe, _, S, _, _, #{ <<"name">> := N}}})
when is_binary(N), byte_size(N) > ?MAX_REFERENCE_SIZE ->
{Code, Counter} = case check_read_permitted(stream_r(S, C), User,#{}) of
ok ->
{?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED};
error ->
{?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED}
end,
response(Transport, C, subscribe, CorrelationId, Code),
rabbit_global_counters:increase_protocol_counter(stream, Counter, 1),
{C, State};
handle_frame_post_auth(Transport,
{ok, #stream_connection{
name = ConnName,
Expand Down Expand Up @@ -3102,7 +3139,7 @@ evaluate_state_after_secret_update(Transport,
{_, Conn1} = ensure_token_expiry_timer(User, Conn0),
PublisherStreams =
lists:foldl(fun(#publisher{stream = Str}, Acc) ->
case rabbit_stream_utils:check_write_permitted(stream_r(Str, Conn0), User) of
case check_write_permitted(stream_r(Str, Conn0), User) of
ok ->
Acc;
_ ->
Expand Down Expand Up @@ -3426,6 +3463,9 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
{not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
end.

store_offset(Reference, _, _, C) when is_binary(Reference), byte_size(Reference) > ?MAX_REFERENCE_SIZE ->
rabbit_log:warning("Reference is too long to store offset: ~p", [byte_size(Reference)]),
C;
store_offset(Reference, Stream, Offset, Connection0) ->
case lookup_leader(Stream, Connection0) of
{error, Error} ->
Expand Down
68 changes: 67 additions & 1 deletion deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ groups() ->
test_super_stream_duplicate_partitions,
authentication_error_should_close_with_delay,
unauthorized_vhost_access_should_close_with_delay,
sasl_anonymous
sasl_anonymous,
test_publisher_with_too_long_reference_errors,
test_consumer_with_too_long_reference_errors
]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
Expand Down Expand Up @@ -945,6 +947,70 @@ unauthorized_vhost_access_should_close_with_delay(Config) ->
closed = wait_for_socket_close(T, S, 10),
ok.

test_publisher_with_too_long_reference_errors(Config) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
ConnectionName = FunctionName,
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
test_authenticate(T, S, C),

Stream = FunctionName,
test_create_stream(T, S, Stream, C),

MaxSize = 255,
ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)),
ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)),

Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK},
{2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}],

[begin
F = request({declare_publisher, PubId, Ref, Stream}),
ok = T:send(S, F),
{Cmd, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {declare_publisher, ExpectedResponseCode}}, Cmd)
end || {PubId, Ref, ExpectedResponseCode} <- Tests],

test_delete_stream(T, S, Stream, C),
test_close(T, S, C),
ok.

test_consumer_with_too_long_reference_errors(Config) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
ConnectionName = FunctionName,
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
test_authenticate(T, S, C),

Stream = FunctionName,
test_create_stream(T, S, Stream, C),

MaxSize = 255,
ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)),
ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)),

Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK},
{2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}],

[begin
F = request({subscribe, SubId, Stream, first, 1, #{<<"name">> => Ref}}),
ok = T:send(S, F),
{Cmd, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {subscribe, ExpectedResponseCode}}, Cmd)
end || {SubId, Ref, ExpectedResponseCode} <- Tests],

test_delete_stream(T, S, Stream, C),
test_close(T, S, C),
ok.

consumer_offset_info(Config, ConnectionName) ->
[[{offset, Offset},
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,
Expand Down

0 comments on commit 2acc329

Please sign in to comment.