diff --git a/deps/rabbit/src/rabbit_access_control.erl b/deps/rabbit/src/rabbit_access_control.erl index cfc8b591eb3..305a3b743f0 100644 --- a/deps/rabbit/src/rabbit_access_control.erl +++ b/deps/rabbit/src/rabbit_access_control.erl @@ -249,7 +249,7 @@ check_user_id0(ClaimedUserName, #user{username = ActualUserName, end. -spec update_state(User :: rabbit_types:user(), NewState :: term()) -> - {'ok', rabbit_types:auth_user()} | + {'ok', rabbit_types:user()} | {'refused', string()} | {'error', any()}. diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index e4555e80603..9cd2669f57b 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -381,7 +381,19 @@ handle_http_req(<<"GET">>, Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key], RespPayload = encode_bindings(Bindings), - {<<"200">>, RespPayload, PermCaches}. + {<<"200">>, RespPayload, PermCaches}; + +handle_http_req(<<"PUT">>, + [<<"auth">>, <<"tokens">>], + _Query, + ReqPayload, + _Vhost, + _User, + ConnPid, + PermCaches) -> + {binary, Token} = ReqPayload, + ok = rabbit_amqp_reader:set_credential(ConnPid, Token), + {<<"204">>, null, PermCaches}. decode_queue({map, KVList}) -> M = lists:foldl( diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index bcfa6a1dcc8..2b001d0ddd7 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -13,7 +13,8 @@ -export([init/1, info/2, - mainloop/2]). + mainloop/2, + set_credential/2]). -export([system_continue/3, system_terminate/4, @@ -53,6 +54,7 @@ channel_max :: non_neg_integer(), auth_mechanism :: sasl_init_unprocessed | {binary(), module()}, auth_state :: term(), + credential_timer :: undefined | reference(), properties :: undefined | {map, list(tuple())} }). @@ -139,6 +141,11 @@ server_properties() -> Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1], {map, Props}. +-spec set_credential(pid(), binary()) -> ok. +set_credential(Pid, Credential) -> + Pid ! {set_credential, Credential}, + ok. + %%-------------------------------------------------------------------------- inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -243,6 +250,8 @@ handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> State; handle_other(terminate_connection, _State) -> stop; +handle_other({set_credential, Cred}, State) -> + set_credential0(Cred, State); handle_other(credential_expired, State) -> Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []), handle_exception(State, 0, Error); @@ -416,15 +425,17 @@ handle_connection_frame( }, helper_sup = HelperSupPid, sock = Sock} = State0) -> - logger:update_process_metadata(#{amqp_container => ContainerId}), Vhost = vhost(Hostname), + logger:update_process_metadata(#{amqp_container => ContainerId, + vhost => Vhost, + user => Username}), ok = check_user_loopback(State0), ok = check_vhost_exists(Vhost, State0), ok = check_vhost_alive(Vhost), ok = rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}), ok = check_vhost_connection_limit(Vhost, Username), ok = check_user_connection_limit(Username), - ok = ensure_credential_expiry_timer(User), + Timer = maybe_start_credential_expiry_timer(User), rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10), notify_auth(user_authentication_success, Username, State0), rabbit_log_connection:info( @@ -499,7 +510,8 @@ handle_connection_frame( outgoing_max_frame_size = OutgoingMaxFrameSize, channel_max = EffectiveChannelMax, properties = Properties, - timeout = ReceiveTimeoutMillis}, + timeout = ReceiveTimeoutMillis, + credential_timer = Timer}, heartbeater = Heartbeater}, State = start_writer(State1), HostnameVal = case Hostname of @@ -871,39 +883,58 @@ check_user_connection_limit(Username) -> end. -%% TODO Provide a means for the client to refresh the credential. -%% This could be either via: -%% 1. SASL (if multiple authentications are allowed on the same AMQP 1.0 connection), see -%% https://datatracker.ietf.org/doc/html/rfc4422#section-3.8 , or -%% 2. Claims Based Security (CBS) extension, see https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html -%% and https://github.com/rabbitmq/rabbitmq-server/issues/9259 -%% 3. Simpler variation of 2. where a token is put to a special /token node. -%% -%% If the user does not refresh their credential on time (the only implementation currently), -%% close the entire connection as we must assume that vhost access could have been revoked. -%% -%% If the user refreshes their credential on time (to be implemented), the AMQP reader should -%% 1. rabbit_access_control:check_vhost_access/4 -%% 2. send a message to all its sessions which should then erase the permission caches and -%% re-check all link permissions (i.e. whether reading / writing to exchanges / queues is still allowed). -%% 3. cancel the current timer, and set a new timer -%% similary as done for Stream connections, see https://github.com/rabbitmq/rabbitmq-server/issues/10292 -ensure_credential_expiry_timer(User) -> +set_credential0(Cred, + State = #v1{connection = #v1_connection{ + user = User0, + vhost = Vhost, + credential_timer = OldTimer} = Conn, + tracked_channels = Chans, + sock = Sock}) -> + rabbit_log:info("updating credential", []), + case rabbit_access_control:update_state(User0, Cred) of + {ok, User} -> + try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of + ok -> + maps:foreach(fun(_ChanNum, Pid) -> + rabbit_amqp_session:reset_authz(Pid, User) + end, Chans), + case OldTimer of + undefined -> + ok; + Ref -> + ok = erlang:cancel_timer(Ref, [{info, false}]) + end, + NewTimer = maybe_start_credential_expiry_timer(User), + State#v1{connection = Conn#v1_connection{user = User, + credential_timer = NewTimer}} + catch _:Reason -> + Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + "access to vhost ~s failed for new credential: ~p", + [Vhost, Reason]), + handle_exception(State, 0, Error) + end; + Err -> + Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + "credential update failed: ~p", + [Err]), + handle_exception(State, 0, Error) + end. + +maybe_start_credential_expiry_timer(User) -> case rabbit_access_control:expiry_timestamp(User) of never -> - ok; + undefined; Ts when is_integer(Ts) -> Time = (Ts - os:system_time(second)) * 1000, rabbit_log:debug( - "Credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", + "credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", [Time, Ts]), case Time > 0 of true -> - _TimerRef = erlang:send_after(Time, self(), credential_expired), - ok; + erlang:send_after(Time, self(), credential_expired); false -> protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, - "Credential expired ~b ms ago", [abs(Time)]) + "credential expired ~b ms ago", [abs(Time)]) end end. diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 81e4d88d071..408041b5041 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -90,7 +90,8 @@ list_local/0, conserve_resources/3, check_resource_access/4, - check_read_permitted_on_topic/4 + check_read_permitted_on_topic/4, + reset_authz/2 ]). -export([init/1, @@ -393,6 +394,10 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, handle_max = ClientHandleMax}}) -> process_flag(trap_exit, true), rabbit_process_flag:adjust_for_message_handling_proc(), + logger:update_process_metadata(#{channel_number => ChannelNum, + connection => ConnName, + vhost => Vhost, + user => User#user.username}), ok = pg:join(pg_scope(), self(), self()), Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), @@ -480,6 +485,10 @@ list_local() -> conserve_resources(Pid, Source, {_, Conserve, _}) -> gen_server:cast(Pid, {conserve_resources, Source, Conserve}). +-spec reset_authz(pid(), rabbit_types:user()) -> ok. +reset_authz(Pid, User) -> + gen_server:cast(Pid, {reset_authz, User}). + handle_call(Msg, _From, State) -> Reply = {error, {not_understood, Msg}}, reply(Reply, State). @@ -574,6 +583,15 @@ handle_cast({conserve_resources, Alarm, Conserve}, noreply(State); handle_cast(refresh_config, #state{cfg = #cfg{vhost = Vhost} = Cfg} = State0) -> State = State0#state{cfg = Cfg#cfg{trace_state = rabbit_trace:init(Vhost)}}, + noreply(State); +handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) -> + State1 = State0#state{ + %% Clear permission caches. + permission_cache = [], + topic_permission_cache = [], + %% Set user with new token including new permissions. + cfg = Cfg#cfg{user = User}}, + State = recheck_authz(State1), noreply(State). log_error_and_close_session( @@ -3522,6 +3540,29 @@ check_topic_authorisation(#exchange{type = topic, check_topic_authorisation(_, _, _, _, Cache) -> Cache. +recheck_authz(#state{incoming_links = IncomingLinks, + outgoing_links = OutgoingLinks, + permission_cache = Cache0, + cfg = #cfg{user = User} + } = State) -> + rabbit_log:debug("rechecking link authorizations", []), + Cache1 = maps:fold( + fun(_Handle, #incoming_link{exchange = X}, Cache) -> + case X of + #exchange{name = XName} -> + check_resource_access(XName, write, User, Cache); + #resource{} = XName -> + check_resource_access(XName, write, User, Cache); + to -> + Cache + end + end, Cache0, IncomingLinks), + Cache2 = maps:fold( + fun(_Handle, #outgoing_link{queue_name = QName}, Cache) -> + check_resource_access(QName, read, User, Cache) + end, Cache1, OutgoingLinks), + State#state{permission_cache = Cache2}. + check_user_id(Mc, User) -> case rabbit_access_control:check_user_id(Mc, User) of ok -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 8688f5e5e67..0d7bd5bf45d 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -470,7 +470,7 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). --spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}. +-spec update_user_state(pid(), rabbit_types:user()) -> 'ok' | {error, channel_terminated}. update_user_state(Pid, UserState) when is_pid(Pid) -> case erlang:is_process_alive(Pid) of