From d2888ae785dd52638535c769e70d84ea0811d611 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 28 Oct 2024 16:46:55 +0100 Subject: [PATCH] WIP Support AMQP 1.0 token renewal Closes #9259. ## What? Allow an AMQP 1.0 client to renew an OAuth 2.0 token before it expires. ## Why? This allows clients to keep the AMQP connection open instead of having to create a new connection whenever the token expires. ## How? As explained in https://github.com/rabbitmq/rabbitmq-server/issues/9259#issuecomment-2437602040 the client can `PUT` a new token on HTTP API v2 path `/auth/tokens`. RabbitMQ will then: 1. Store the new token on the given connection. 2. Recheck access to the connection's vhost. 3. Clear all permission caches in the AMQP sessions. 4. Recheck write permissions to exchanges for links publishing to RabbitMQ, and recheck read permissions from queues for links consuming from RabbitMQ. The latter complies with the user expectation in #11364. TODOs: * Tests * Update 4.1.0 relase notes * Update docs * Check new log process metadata --- deps/rabbit/src/rabbit_access_control.erl | 2 +- deps/rabbit/src/rabbit_amqp_management.erl | 14 +++- deps/rabbit/src/rabbit_amqp_reader.erl | 85 +++++++++++++++------- deps/rabbit/src/rabbit_amqp_session.erl | 43 ++++++++++- deps/rabbit/src/rabbit_channel.erl | 2 +- 5 files changed, 115 insertions(+), 31 deletions(-) diff --git a/deps/rabbit/src/rabbit_access_control.erl b/deps/rabbit/src/rabbit_access_control.erl index cfc8b591eb3f..305a3b743f0f 100644 --- a/deps/rabbit/src/rabbit_access_control.erl +++ b/deps/rabbit/src/rabbit_access_control.erl @@ -249,7 +249,7 @@ check_user_id0(ClaimedUserName, #user{username = ActualUserName, end. -spec update_state(User :: rabbit_types:user(), NewState :: term()) -> - {'ok', rabbit_types:auth_user()} | + {'ok', rabbit_types:user()} | {'refused', string()} | {'error', any()}. diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index e4555e806033..9cd2669f57b1 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -381,7 +381,19 @@ handle_http_req(<<"GET">>, Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key], RespPayload = encode_bindings(Bindings), - {<<"200">>, RespPayload, PermCaches}. + {<<"200">>, RespPayload, PermCaches}; + +handle_http_req(<<"PUT">>, + [<<"auth">>, <<"tokens">>], + _Query, + ReqPayload, + _Vhost, + _User, + ConnPid, + PermCaches) -> + {binary, Token} = ReqPayload, + ok = rabbit_amqp_reader:set_credential(ConnPid, Token), + {<<"204">>, null, PermCaches}. decode_queue({map, KVList}) -> M = lists:foldl( diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index bcfa6a1dcc8c..2b001d0ddd75 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -13,7 +13,8 @@ -export([init/1, info/2, - mainloop/2]). + mainloop/2, + set_credential/2]). -export([system_continue/3, system_terminate/4, @@ -53,6 +54,7 @@ channel_max :: non_neg_integer(), auth_mechanism :: sasl_init_unprocessed | {binary(), module()}, auth_state :: term(), + credential_timer :: undefined | reference(), properties :: undefined | {map, list(tuple())} }). @@ -139,6 +141,11 @@ server_properties() -> Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1], {map, Props}. +-spec set_credential(pid(), binary()) -> ok. +set_credential(Pid, Credential) -> + Pid ! {set_credential, Credential}, + ok. + %%-------------------------------------------------------------------------- inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -243,6 +250,8 @@ handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> State; handle_other(terminate_connection, _State) -> stop; +handle_other({set_credential, Cred}, State) -> + set_credential0(Cred, State); handle_other(credential_expired, State) -> Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []), handle_exception(State, 0, Error); @@ -416,15 +425,17 @@ handle_connection_frame( }, helper_sup = HelperSupPid, sock = Sock} = State0) -> - logger:update_process_metadata(#{amqp_container => ContainerId}), Vhost = vhost(Hostname), + logger:update_process_metadata(#{amqp_container => ContainerId, + vhost => Vhost, + user => Username}), ok = check_user_loopback(State0), ok = check_vhost_exists(Vhost, State0), ok = check_vhost_alive(Vhost), ok = rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}), ok = check_vhost_connection_limit(Vhost, Username), ok = check_user_connection_limit(Username), - ok = ensure_credential_expiry_timer(User), + Timer = maybe_start_credential_expiry_timer(User), rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10), notify_auth(user_authentication_success, Username, State0), rabbit_log_connection:info( @@ -499,7 +510,8 @@ handle_connection_frame( outgoing_max_frame_size = OutgoingMaxFrameSize, channel_max = EffectiveChannelMax, properties = Properties, - timeout = ReceiveTimeoutMillis}, + timeout = ReceiveTimeoutMillis, + credential_timer = Timer}, heartbeater = Heartbeater}, State = start_writer(State1), HostnameVal = case Hostname of @@ -871,39 +883,58 @@ check_user_connection_limit(Username) -> end. -%% TODO Provide a means for the client to refresh the credential. -%% This could be either via: -%% 1. SASL (if multiple authentications are allowed on the same AMQP 1.0 connection), see -%% https://datatracker.ietf.org/doc/html/rfc4422#section-3.8 , or -%% 2. Claims Based Security (CBS) extension, see https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html -%% and https://github.com/rabbitmq/rabbitmq-server/issues/9259 -%% 3. Simpler variation of 2. where a token is put to a special /token node. -%% -%% If the user does not refresh their credential on time (the only implementation currently), -%% close the entire connection as we must assume that vhost access could have been revoked. -%% -%% If the user refreshes their credential on time (to be implemented), the AMQP reader should -%% 1. rabbit_access_control:check_vhost_access/4 -%% 2. send a message to all its sessions which should then erase the permission caches and -%% re-check all link permissions (i.e. whether reading / writing to exchanges / queues is still allowed). -%% 3. cancel the current timer, and set a new timer -%% similary as done for Stream connections, see https://github.com/rabbitmq/rabbitmq-server/issues/10292 -ensure_credential_expiry_timer(User) -> +set_credential0(Cred, + State = #v1{connection = #v1_connection{ + user = User0, + vhost = Vhost, + credential_timer = OldTimer} = Conn, + tracked_channels = Chans, + sock = Sock}) -> + rabbit_log:info("updating credential", []), + case rabbit_access_control:update_state(User0, Cred) of + {ok, User} -> + try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of + ok -> + maps:foreach(fun(_ChanNum, Pid) -> + rabbit_amqp_session:reset_authz(Pid, User) + end, Chans), + case OldTimer of + undefined -> + ok; + Ref -> + ok = erlang:cancel_timer(Ref, [{info, false}]) + end, + NewTimer = maybe_start_credential_expiry_timer(User), + State#v1{connection = Conn#v1_connection{user = User, + credential_timer = NewTimer}} + catch _:Reason -> + Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + "access to vhost ~s failed for new credential: ~p", + [Vhost, Reason]), + handle_exception(State, 0, Error) + end; + Err -> + Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + "credential update failed: ~p", + [Err]), + handle_exception(State, 0, Error) + end. + +maybe_start_credential_expiry_timer(User) -> case rabbit_access_control:expiry_timestamp(User) of never -> - ok; + undefined; Ts when is_integer(Ts) -> Time = (Ts - os:system_time(second)) * 1000, rabbit_log:debug( - "Credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", + "credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", [Time, Ts]), case Time > 0 of true -> - _TimerRef = erlang:send_after(Time, self(), credential_expired), - ok; + erlang:send_after(Time, self(), credential_expired); false -> protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, - "Credential expired ~b ms ago", [abs(Time)]) + "credential expired ~b ms ago", [abs(Time)]) end end. diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 81e4d88d071d..408041b50418 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -90,7 +90,8 @@ list_local/0, conserve_resources/3, check_resource_access/4, - check_read_permitted_on_topic/4 + check_read_permitted_on_topic/4, + reset_authz/2 ]). -export([init/1, @@ -393,6 +394,10 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, handle_max = ClientHandleMax}}) -> process_flag(trap_exit, true), rabbit_process_flag:adjust_for_message_handling_proc(), + logger:update_process_metadata(#{channel_number => ChannelNum, + connection => ConnName, + vhost => Vhost, + user => User#user.username}), ok = pg:join(pg_scope(), self(), self()), Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), @@ -480,6 +485,10 @@ list_local() -> conserve_resources(Pid, Source, {_, Conserve, _}) -> gen_server:cast(Pid, {conserve_resources, Source, Conserve}). +-spec reset_authz(pid(), rabbit_types:user()) -> ok. +reset_authz(Pid, User) -> + gen_server:cast(Pid, {reset_authz, User}). + handle_call(Msg, _From, State) -> Reply = {error, {not_understood, Msg}}, reply(Reply, State). @@ -574,6 +583,15 @@ handle_cast({conserve_resources, Alarm, Conserve}, noreply(State); handle_cast(refresh_config, #state{cfg = #cfg{vhost = Vhost} = Cfg} = State0) -> State = State0#state{cfg = Cfg#cfg{trace_state = rabbit_trace:init(Vhost)}}, + noreply(State); +handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) -> + State1 = State0#state{ + %% Clear permission caches. + permission_cache = [], + topic_permission_cache = [], + %% Set user with new token including new permissions. + cfg = Cfg#cfg{user = User}}, + State = recheck_authz(State1), noreply(State). log_error_and_close_session( @@ -3522,6 +3540,29 @@ check_topic_authorisation(#exchange{type = topic, check_topic_authorisation(_, _, _, _, Cache) -> Cache. +recheck_authz(#state{incoming_links = IncomingLinks, + outgoing_links = OutgoingLinks, + permission_cache = Cache0, + cfg = #cfg{user = User} + } = State) -> + rabbit_log:debug("rechecking link authorizations", []), + Cache1 = maps:fold( + fun(_Handle, #incoming_link{exchange = X}, Cache) -> + case X of + #exchange{name = XName} -> + check_resource_access(XName, write, User, Cache); + #resource{} = XName -> + check_resource_access(XName, write, User, Cache); + to -> + Cache + end + end, Cache0, IncomingLinks), + Cache2 = maps:fold( + fun(_Handle, #outgoing_link{queue_name = QName}, Cache) -> + check_resource_access(QName, read, User, Cache) + end, Cache1, OutgoingLinks), + State#state{permission_cache = Cache2}. + check_user_id(Mc, User) -> case rabbit_access_control:check_user_id(Mc, User) of ok -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 8688f5e5e679..0d7bd5bf45d7 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -470,7 +470,7 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). --spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}. +-spec update_user_state(pid(), rabbit_types:user()) -> 'ok' | {error, channel_terminated}. update_user_state(Pid, UserState) when is_pid(Pid) -> case erlang:is_process_alive(Pid) of