From bdec00f4268a8dfbf3618a245df205d040953b05 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 9 Sep 2024 15:56:20 +0800 Subject: [PATCH 01/10] rename SESSION_TOKEN_VALIDITY SESSION_TOKEN_TTL --- .../http/v1/session/client_session_manager.rs | 20 +++++++++---------- .../servers/http/v1/session/login_handler.rs | 14 +++++++------ .../servers/http/v1/session/renew_handler.rs | 8 ++++---- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index 3811e737bba1..b32fe3726edc 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -43,8 +43,8 @@ use crate::sessions::Session; use crate::sessions::SessionPrivilegeManager; /// target TTL -pub const REFRESH_TOKEN_VALIDITY: Duration = Duration::from_hours(4); -pub const SESSION_TOKEN_VALIDITY: Duration = Duration::from_hours(1); +pub const REFRESH_TOKEN_TTL: Duration = Duration::from_hours(4); +pub const SESSION_TOKEN_TTL: Duration = Duration::from_hours(1); /// to cove network latency, retry and time skew const TOKEN_TTL_DELAY: Duration = Duration::from_secs(300); @@ -75,7 +75,7 @@ impl QueryState { pub fn has_expired(&self, now: &Instant) -> bool { match self { QueryState::InUse => false, - QueryState::Idle(t) => (*now - *t) > SESSION_TOKEN_VALIDITY, + QueryState::Idle(t) => (*now - *t) > SESSION_TOKEN_TTL, } } } @@ -132,7 +132,7 @@ impl ClientSessionManager { for (id, mgr) in expired { drop_all_temp_tables(&id, mgr).await.ok(); } - tokio::time::sleep(SESSION_TOKEN_VALIDITY / 4).await; + tokio::time::sleep(SESSION_TOKEN_TTL / 4).await; } } @@ -168,7 +168,7 @@ impl ClientSessionManager { &tenant_name, &user, &auth_role, - REFRESH_TOKEN_VALIDITY + SESSION_TOKEN_VALIDITY, + REFRESH_TOKEN_TTL + SESSION_TOKEN_TTL, ); let refresh_token = if let Some(old) = &old_token_pair { old.refresh.clone() @@ -187,7 +187,7 @@ impl ClientSessionManager { .upsert_token( &refresh_token_hash, token_info.clone(), - REFRESH_TOKEN_VALIDITY + SESSION_TOKEN_VALIDITY + TOKEN_TTL_DELAY, + REFRESH_TOKEN_TTL + SESSION_TOKEN_TTL + TOKEN_TTL_DELAY, false, ) .await?; @@ -197,7 +197,7 @@ impl ClientSessionManager { ClientSession { user_name: claim.user.clone(), }, - REFRESH_TOKEN_VALIDITY + SESSION_TOKEN_VALIDITY + TOKEN_TTL_DELAY, + REFRESH_TOKEN_TTL + SESSION_TOKEN_TTL + TOKEN_TTL_DELAY, ) .await?; if let Some(old) = old_token_pair { @@ -205,7 +205,7 @@ impl ClientSessionManager { .upsert_token( &old.refresh, token_info, - REFRESH_TOKEN_VALIDITY + TOKEN_DROP_DELAY, + REFRESH_TOKEN_TTL + TOKEN_DROP_DELAY, true, ) .await?; @@ -214,7 +214,7 @@ impl ClientSessionManager { .write() .insert(refresh_token_hash.clone(), None); - claim.expire_at_in_secs = (now + SESSION_TOKEN_VALIDITY).as_secs(); + claim.expire_at_in_secs = (now + SESSION_TOKEN_TTL).as_secs(); claim.nonce = uuid::Uuid::new_v4().to_string(); let session_token = claim.encode(); @@ -227,7 +227,7 @@ impl ClientSessionManager { .upsert_token( &session_token_hash, token_info, - REFRESH_TOKEN_VALIDITY + TOKEN_TTL_DELAY, + REFRESH_TOKEN_TTL + TOKEN_TTL_DELAY, false, ) .await?; diff --git a/src/query/service/src/servers/http/v1/session/login_handler.rs b/src/query/service/src/servers/http/v1/session/login_handler.rs index e55904466d55..65dddf1ffae5 100644 --- a/src/query/service/src/servers/http/v1/session/login_handler.rs +++ b/src/query/service/src/servers/http/v1/session/login_handler.rs @@ -24,8 +24,8 @@ use poem::IntoResponse; use crate::servers::http::error::QueryError; use crate::servers::http::v1::session::client_session_manager::ClientSessionManager; -use crate::servers::http::v1::session::client_session_manager::REFRESH_TOKEN_VALIDITY; -use crate::servers::http::v1::session::client_session_manager::SESSION_TOKEN_VALIDITY; +use crate::servers::http::v1::session::client_session_manager::REFRESH_TOKEN_TTL; +use crate::servers::http::v1::session::client_session_manager::SESSION_TOKEN_TTL; use crate::servers::http::v1::HttpQueryContext; #[derive(Deserialize, Clone)] @@ -101,10 +101,12 @@ pub async fn login_handler( Ok((session_id, token_pair)) => Ok(Json(LoginResponse::Ok { version, session_id, - session_token: token_pair.session, - refresh_token: token_pair.refresh, - session_token_validity_in_secs: SESSION_TOKEN_VALIDITY.as_secs(), - refresh_token_validity_in_secs: REFRESH_TOKEN_VALIDITY.as_secs(), + tokens: Some(TokensInfo { + session_token: token_pair.session, + refresh_token: token_pair.refresh, + session_token_ttl_in_secs: SESSION_TOKEN_TTL.as_secs(), + refresh_token_ttl_in_secs: REFRESH_TOKEN_TTL.as_secs(), + }), })), Err(e) => Ok(Json(LoginResponse::Error { error: QueryError::from_error_code(e), diff --git a/src/query/service/src/servers/http/v1/session/renew_handler.rs b/src/query/service/src/servers/http/v1/session/renew_handler.rs index a9e45e73c22f..162d2baa404d 100644 --- a/src/query/service/src/servers/http/v1/session/renew_handler.rs +++ b/src/query/service/src/servers/http/v1/session/renew_handler.rs @@ -21,8 +21,8 @@ use poem::IntoResponse; use crate::servers::http::error::QueryError; use crate::servers::http::v1::session::client_session_manager::ClientSessionManager; use crate::servers::http::v1::session::client_session_manager::TokenPair; -use crate::servers::http::v1::session::client_session_manager::REFRESH_TOKEN_VALIDITY; -use crate::servers::http::v1::session::client_session_manager::SESSION_TOKEN_VALIDITY; +use crate::servers::http::v1::session::client_session_manager::REFRESH_TOKEN_TTL; +use crate::servers::http::v1::session::client_session_manager::SESSION_TOKEN_TTL; use crate::servers::http::v1::HttpQueryContext; #[derive(Deserialize, Clone)] @@ -68,8 +68,8 @@ pub async fn renew_handler( Ok((_, token_pair)) => Ok(Json(RenewResponse::Ok { session_token: token_pair.session, refresh_token: token_pair.refresh, - session_token_validity_in_secs: SESSION_TOKEN_VALIDITY.as_secs(), - refresh_token_validity_in_secs: REFRESH_TOKEN_VALIDITY.as_secs(), + session_token_validity_in_secs: SESSION_TOKEN_TTL.as_secs(), + refresh_token_validity_in_secs: REFRESH_TOKEN_TTL.as_secs(), })), Err(e) => Ok(Json(RenewResponse::Error { error: QueryError::from_error_code(e), From b697f585e5f65b549d8f486fdfa5dd5704b438da Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 10 Sep 2024 23:15:12 +0800 Subject: [PATCH 02/10] feat: not use session token for JWT session. fix: refresh in memory state when /session/refresh 1. rename renew -> refresh. 2. extract HttpErrorCode. 3. adjust, extract and rename TTLs, add TTL_GRACE_PERIOD_QUERY --- src/query/service/src/auth.rs | 1 + src/query/service/src/servers/http/error.rs | 51 +++++-- .../service/src/servers/http/http_services.rs | 13 +- .../service/src/servers/http/middleware.rs | 10 +- src/query/service/src/servers/http/v1/mod.rs | 4 +- .../http/v1/query/http_query_context.rs | 3 +- .../http/v1/session/client_session_manager.rs | 141 ++++++++++++------ .../src/servers/http/v1/session/consts.rs | 32 ++++ .../servers/http/v1/session/login_handler.rs | 83 ++++++----- .../servers/http/v1/session/logout_handler.rs | 8 +- .../src/servers/http/v1/session/mod.rs | 3 +- .../http/v1/session/refresh_handler.rs | 81 ++++++++++ .../servers/http/v1/session/renew_handler.rs | 78 ---------- .../09_http_handler/09_0007_token.py | 28 ++-- .../09_http_handler/09_0007_token.result | 26 ++-- 15 files changed, 340 insertions(+), 222 deletions(-) create mode 100644 src/query/service/src/servers/http/v1/session/consts.rs create mode 100644 src/query/service/src/servers/http/v1/session/refresh_handler.rs delete mode 100644 src/query/service/src/servers/http/v1/session/renew_handler.rs diff --git a/src/query/service/src/auth.rs b/src/query/service/src/auth.rs index 1d8386050dc1..aa2a133e346d 100644 --- a/src/query/service/src/auth.rs +++ b/src/query/service/src/auth.rs @@ -35,6 +35,7 @@ pub struct AuthMgr { jwt_auth: Option, } +#[derive(Clone)] pub enum Credential { DatabendToken { token: String, diff --git a/src/query/service/src/servers/http/error.rs b/src/query/service/src/servers/http/error.rs index 733e976c82c1..d7da09586cef 100644 --- a/src/query/service/src/servers/http/error.rs +++ b/src/query/service/src/servers/http/error.rs @@ -56,11 +56,42 @@ pub struct JsonErrorOnly { // todo(youngsofun): use this in more place /// turn internal ErrorCode to Http Response with Json Body and proper StatusCode -pub struct JsonErrorCode(pub ErrorCode); +#[derive(Debug)] +pub struct HttpErrorCode { + error_code: ErrorCode, + status_code: Option, +} + +impl HttpErrorCode { + pub fn new(error_code: ErrorCode, status_code: StatusCode) -> Self { + Self { + error_code, + status_code: Some(status_code), + } + } + + pub fn error_code(error_code: ErrorCode) -> Self { + Self { + error_code, + status_code: None, + } + } + + pub fn bad_request(error_code: ErrorCode) -> Self { + Self::new(error_code, StatusCode::BAD_REQUEST) + } -impl ResponseError for JsonErrorCode { + pub fn server_error(error_code: ErrorCode) -> Self { + Self::new(error_code, StatusCode::INTERNAL_SERVER_ERROR) + } +} + +impl ResponseError for HttpErrorCode { fn status(&self) -> StatusCode { - match self.0.code() { + if let Some(s) = self.status_code { + return s; + } + match self.error_code.code() { ErrorCode::AUTHENTICATE_FAILURE | ErrorCode::SESSION_TOKEN_EXPIRED | ErrorCode::SESSION_TOKEN_NOT_FOUND @@ -76,23 +107,17 @@ impl ResponseError for JsonErrorCode { ( self.status(), Json(JsonErrorOnly { - error: QueryError::from_error_code(self.0.clone()), + error: QueryError::from_error_code(self.error_code.clone()), }), ) .into_response() } } -impl Debug for JsonErrorCode { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl Display for JsonErrorCode { +impl Display for HttpErrorCode { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) + write!(f, "{:?} {}", self.status_code, self.error_code) } } -impl std::error::Error for JsonErrorCode {} +impl std::error::Error for HttpErrorCode {} diff --git a/src/query/service/src/servers/http/http_services.rs b/src/query/service/src/servers/http/http_services.rs index ad5df4202c65..99f0ca8b1078 100644 --- a/src/query/service/src/servers/http/http_services.rs +++ b/src/query/service/src/servers/http/http_services.rs @@ -45,7 +45,7 @@ use crate::servers::http::v1::clickhouse_router; use crate::servers::http::v1::list_suggestions; use crate::servers::http::v1::login_handler; use crate::servers::http::v1::query_route; -use crate::servers::http::v1::renew_handler; +use crate::servers::http::v1::refresh_handler; use crate::servers::Server; #[derive(Copy, Clone)] @@ -118,8 +118,15 @@ impl HttpHandler { )), ) .at( - "/session/renew", - post(renew_handler).with(HTTPSessionMiddleware::create( + "/session/refresh", + post(refresh_handler).with(HTTPSessionMiddleware::create( + self.kind, + EndpointKind::Refresh, + )), + ) + .at( + "/auth/verify", + post(refresh_handler).with(HTTPSessionMiddleware::create( self.kind, EndpointKind::Refresh, )), diff --git a/src/query/service/src/servers/http/middleware.rs b/src/query/service/src/servers/http/middleware.rs index d5853950a1c0..f091fd587cc4 100644 --- a/src/query/service/src/servers/http/middleware.rs +++ b/src/query/service/src/servers/http/middleware.rs @@ -60,7 +60,7 @@ use super::v1::HttpQueryContext; use super::v1::SessionClaim; use crate::auth::AuthMgr; use crate::auth::Credential; -use crate::servers::http::error::JsonErrorCode; +use crate::servers::http::error::HttpErrorCode; use crate::servers::http::error::JsonErrorOnly; use crate::servers::http::error::QueryError; use crate::servers::HttpHandlerKind; @@ -314,10 +314,6 @@ impl HTTPSessionEndpoint { if let Some(id) = client_session_id.clone() { session.set_client_session_id(id) } - let databend_token = match credential { - Credential::DatabendToken { token, .. } => Some(token), - _ => None, - }; let session = session_manager.register_session(session)?; @@ -349,6 +345,7 @@ impl HTTPSessionEndpoint { session, query_id, node_id, + credential, expected_node_id, deduplicate_label, user_agent, @@ -357,7 +354,6 @@ impl HTTPSessionEndpoint { http_method: req.method().to_string(), uri: req.uri().to_string(), client_host, - databend_token, client_session_id, }) } @@ -389,7 +385,7 @@ impl Endpoint for HTTPSessionEndpoint { self.ep.call(req).await.map(|v| v.into_response()) } Err(err) => { - let err = JsonErrorCode(err); + let err = HttpErrorCode::error_code(err); if err.status() == StatusCode::UNAUTHORIZED { warn!( "http auth failure: {method} {uri}, headers={:?}, error={}", diff --git a/src/query/service/src/servers/http/v1/mod.rs b/src/query/service/src/servers/http/v1/mod.rs index 15296f68b791..901ad11ee243 100644 --- a/src/query/service/src/servers/http/v1/mod.rs +++ b/src/query/service/src/servers/http/v1/mod.rs @@ -36,8 +36,8 @@ pub use query::HttpSessionConf; pub use session::login_handler::login_handler; pub use session::login_handler::LoginResponse; pub use session::logout_handler::logout_handler; -pub use session::renew_handler::renew_handler; -pub use session::renew_handler::RenewResponse; +pub use session::refresh_handler::refresh_handler; +pub use session::refresh_handler::RefreshResponse; pub use session::ClientSessionManager; pub(crate) use session::SessionClaim; pub use stage::upload_to_stage; diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index 94052cc0e29b..dbce26eeaa75 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -22,6 +22,7 @@ use poem::FromRequest; use poem::Request; use poem::RequestBody; +use crate::auth::Credential; use crate::servers::http::v1::HttpQueryManager; use crate::sessions::Session; use crate::sessions::SessionManager; @@ -30,6 +31,7 @@ use crate::sessions::SessionType; #[derive(Clone)] pub struct HttpQueryContext { pub session: Arc, + pub credential: Credential, pub query_id: String, pub node_id: String, pub expected_node_id: Option, @@ -40,7 +42,6 @@ pub struct HttpQueryContext { pub http_method: String, pub uri: String, pub client_host: Option, - pub databend_token: Option, pub client_session_id: Option, } diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index b32fe3726edc..b3b400b6ee43 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -15,7 +15,6 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::sync::Arc; -use std::time::Duration; use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::Thread; @@ -37,21 +36,16 @@ use sha2::Digest; use sha2::Sha256; use tokio::time::Instant; +use crate::servers::http::v1::session::consts::REFRESH_TOKEN_TTL; +use crate::servers::http::v1::session::consts::SESSION_TOKEN_TTL; +use crate::servers::http::v1::session::consts::TOMBSTONE_TTL; +use crate::servers::http::v1::session::consts::TTL_GRACE_PERIOD_META; +use crate::servers::http::v1::session::consts::TTL_GRACE_PERIOD_QUERY; use crate::servers::http::v1::session::token::unix_ts; use crate::servers::http::v1::SessionClaim; use crate::sessions::Session; use crate::sessions::SessionPrivilegeManager; -/// target TTL -pub const REFRESH_TOKEN_TTL: Duration = Duration::from_hours(4); -pub const SESSION_TOKEN_TTL: Duration = Duration::from_hours(1); - -/// to cove network latency, retry and time skew -const TOKEN_TTL_DELAY: Duration = Duration::from_secs(300); -/// in case of client retry, shorted the ttl instead of drop at once -/// only required for refresh token. -const TOKEN_DROP_DELAY: Duration = Duration::from_secs(90); - pub struct TokenPair { pub refresh: String, pub session: String, @@ -81,17 +75,31 @@ impl QueryState { } pub struct ClientSessionManager { + /// cache of tokens to avoid request for MetaServer on each auth. + /// /// store hash only for hit ratio with limited memory, feasible because: /// - token contain all info in itself. /// - for eviction, LRU itself is enough, no need to check expired tokens specifically. session_tokens: RwLock>>, refresh_tokens: RwLock>>, - /// add: write temp table + /// state that + /// 1. lives across query + /// 2. only in memory + /// 3. too large to store in client + /// + /// # Ops + /// add: + /// - write temp table + /// /// rm: /// - all temp table deleted /// - session closed /// - timeout + /// + /// refresh: + /// - query start/stop + /// - /session/refresh session_state: Mutex>, } @@ -136,22 +144,55 @@ impl ClientSessionManager { } } + pub async fn new_session_id_for_jwt(&self, session: &Arc) -> Result { + let client_session_id = uuid::Uuid::new_v4().to_string(); + let tenant = session.get_current_tenant(); + let user_name = session.get_current_user()?.name; + let client_session_api = UserApiProvider::instance().client_session_api(&tenant); + client_session_api + .upsert_client_session_id( + &client_session_id, + ClientSession { user_name }, + REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, + ) + .await?; + Ok(client_session_id) + } + + pub async fn refresh_session_handle( + &self, + session: &Arc, + client_session_id: &str, + ) -> Result<()> { + let tenant = session.get_current_tenant(); + let user_name = session.get_current_user()?.name; + let client_session_api = UserApiProvider::instance().client_session_api(&tenant); + client_session_api + .upsert_client_session_id( + client_session_id, + ClientSession { user_name }, + REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, + ) + .await?; + Ok(()) + } + /// used for both issue token for new session and renew token for existing session. /// currently, when renewing, always return a new refresh token instead of update the TTL, /// since now we include expire time in token, which can not be updated. pub async fn new_token_pair( &self, session: &Arc, - old_token_pair: Option, + old_refresh_token: Option, + old_session_token: Option, ) -> Result<(String, TokenPair)> { // those infos are set when the request is authed let tenant = session.get_current_tenant(); let tenant_name = tenant.tenant_name().to_string(); let user = session.get_current_user()?.name; let auth_role = session.privilege_mgr().get_auth_role(); - - let client_session_id = if let Some(old) = &old_token_pair { - let claim = SessionClaim::decode(&old.refresh)?; + let client_session_id = if let Some(old) = &old_refresh_token { + let claim = SessionClaim::decode(&old)?; assert_eq!(tenant_name, claim.tenant); assert_eq!(user, claim.user); assert_eq!(auth_role, claim.auth_role); @@ -162,58 +203,40 @@ impl ClientSessionManager { let client_session_api = UserApiProvider::instance().client_session_api(&tenant); + // new refresh token let now = unix_ts(); let mut claim = SessionClaim::new( None, &tenant_name, &user, &auth_role, - REFRESH_TOKEN_TTL + SESSION_TOKEN_TTL, + REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, ); - let refresh_token = if let Some(old) = &old_token_pair { - old.refresh.clone() - } else { - claim.encode() - }; + let refresh_token = claim.encode(); let refresh_token_hash = hash_token(refresh_token.as_bytes()); let token_info = QueryTokenInfo { token_type: TokenType::Refresh, parent: None, }; - - // by adding SESSION_TOKEN_VALIDITY_IN_SECS, avoid touch refresh token for each request. - // note the ttl is not accurate, the TTL is in fact longer (by 0 to 1 hour) then expected. client_session_api .upsert_token( &refresh_token_hash, token_info.clone(), - REFRESH_TOKEN_TTL + SESSION_TOKEN_TTL + TOKEN_TTL_DELAY, + REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, false, ) .await?; - client_session_api - .upsert_client_session_id( - &client_session_id, - ClientSession { - user_name: claim.user.clone(), - }, - REFRESH_TOKEN_TTL + SESSION_TOKEN_TTL + TOKEN_TTL_DELAY, - ) - .await?; - if let Some(old) = old_token_pair { + if let Some(old) = old_refresh_token { client_session_api - .upsert_token( - &old.refresh, - token_info, - REFRESH_TOKEN_TTL + TOKEN_DROP_DELAY, - true, - ) + .upsert_token(&old, token_info, TOMBSTONE_TTL, true) .await?; + self.refresh_in_memory_states(&client_session_id, session); }; self.refresh_tokens .write() .insert(refresh_token_hash.clone(), None); + // session token claim.expire_at_in_secs = (now + SESSION_TOKEN_TTL).as_secs(); claim.nonce = uuid::Uuid::new_v4().to_string(); @@ -226,14 +249,30 @@ impl ClientSessionManager { client_session_api .upsert_token( &session_token_hash, - token_info, - REFRESH_TOKEN_TTL + TOKEN_TTL_DELAY, + token_info.clone(), + REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, false, ) .await?; self.session_tokens .write() .insert(session_token_hash, Some(refresh_token_hash)); + if let Some(old) = old_session_token { + client_session_api + .upsert_token(&old, token_info, TOMBSTONE_TTL, true) + .await?; + }; + + // client session id + client_session_api + .upsert_client_session_id( + &client_session_id, + ClientSession { + user_name: claim.user.clone(), + }, + REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, + ) + .await?; Ok((client_session_id, TokenPair { refresh: refresh_token, @@ -248,7 +287,7 @@ impl ClientSessionManager { ) -> Result { let claim = SessionClaim::decode(token)?; let now = unix_ts().as_secs(); - if now > claim.expire_at_in_secs { + if now > claim.expire_at_in_secs + TTL_GRACE_PERIOD_QUERY.as_secs() { return match token_type { TokenType::Refresh => Err(ErrorCode::RefreshTokenExpired("refresh token expired")), TokenType::Session => Err(ErrorCode::SessionTokenExpired("session token expired")), @@ -324,13 +363,23 @@ impl ClientSessionManager { Ok(()) } - pub fn on_query_start(&self, client_session_id: &str, session: &Arc) { + pub fn refresh_in_memory_states(&self, client_session_id: &str, session: &Arc) { let mut guard = self.session_state.lock(); guard.entry(client_session_id.to_string()).and_modify(|e| { e.query_state = QueryState::InUse; session.set_temp_tbl_mgr(e.temp_tbl_mgr.clone()) }); } + + pub fn on_query_start(&self, client_session_id: &str, session: &Arc) { + let mut guard = self.session_state.lock(); + guard.entry(client_session_id.to_string()).and_modify(|e| { + if matches!(e.query_state, QueryState::Idle(_)) { + e.query_state = QueryState::Idle(Instant::now()); + session.set_temp_tbl_mgr(e.temp_tbl_mgr.clone()); + } + }); + } pub fn on_query_finish(&self, client_session_id: &str, session: &Arc) { let temp_tbl_mgr = session.temp_tbl_mgr(); let (is_empty, just_changed) = temp_tbl_mgr.lock().is_empty(); diff --git a/src/query/service/src/servers/http/v1/session/consts.rs b/src/query/service/src/servers/http/v1/session/consts.rs new file mode 100644 index 000000000000..24b3f5938c11 --- /dev/null +++ b/src/query/service/src/servers/http/v1/session/consts.rs @@ -0,0 +1,32 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +/// used for both client session id and refresh token TTL +pub const REFRESH_TOKEN_TTL: Duration = Duration::from_hours(4); + +/// used for both session token TTL +pub const SESSION_TOKEN_TTL: Duration = Duration::from_hours(1); + +/// client start timing for TTL later then meta +pub const TTL_GRACE_PERIOD_META: Duration = Duration::from_secs(90); + +/// query server quick check expire_at field in token, which may be set on another server +pub const TTL_GRACE_PERIOD_QUERY: Duration = Duration::from_secs(600); + +/// in case of client retry, shorten the TTL instead of drop at once +/// only required for refresh token. +/// e.g. /session/refresh still need the token for auth when retrying. +pub const TOMBSTONE_TTL: Duration = Duration::from_secs(90); diff --git a/src/query/service/src/servers/http/v1/session/login_handler.rs b/src/query/service/src/servers/http/v1/session/login_handler.rs index 65dddf1ffae5..14069ea14616 100644 --- a/src/query/service/src/servers/http/v1/session/login_handler.rs +++ b/src/query/service/src/servers/http/v1/session/login_handler.rs @@ -22,10 +22,10 @@ use poem::error::Result as PoemResult; use poem::web::Json; use poem::IntoResponse; -use crate::servers::http::error::QueryError; +use crate::auth::Credential; +use crate::servers::http::error::HttpErrorCode; use crate::servers::http::v1::session::client_session_manager::ClientSessionManager; -use crate::servers::http::v1::session::client_session_manager::REFRESH_TOKEN_TTL; -use crate::servers::http::v1::session::client_session_manager::SESSION_TOKEN_TTL; +use crate::servers::http::v1::session::consts::SESSION_TOKEN_TTL; use crate::servers::http::v1::HttpQueryContext; #[derive(Deserialize, Clone)] @@ -36,19 +36,14 @@ struct LoginRequest { } #[derive(Serialize, Debug, Clone)] -#[serde(untagged)] -pub enum LoginResponse { - Ok { - version: String, - session_id: String, - session_token: String, - refresh_token: String, - session_token_validity_in_secs: u64, - refresh_token_validity_in_secs: u64, - }, - Error { - error: QueryError, - }, +pub struct LoginResponse { + version: String, + session_id: String, + refresh_interval_in_secs: u64, + + /// for now, only use session token when authed by user-password + session_token: Option, + refresh_token: Option, } /// Although theses can be checked for each /v1/query for now, @@ -78,7 +73,7 @@ async fn check_login( Ok(()) } -/// # For SQL driver implementer: +/// # For client/driver developer: /// - It is encouraged to call `/v1/session/login` when establishing connection, not mandatory for now. /// - May get 404 when talk to old server, may check `/health` (no `/v1` prefix) to ensure the host:port is not wrong. #[poem::handler] @@ -88,28 +83,38 @@ pub async fn login_handler( Json(req): Json, ) -> PoemResult { let version = QUERY_SEMVER.to_string(); - if let Err(error) = check_login(ctx, &req).await { - return Ok(Json(LoginResponse::Error { - error: QueryError::from_error_code(error), - })); - } - - match ClientSessionManager::instance() - .new_token_pair(&ctx.session, None) + check_login(ctx, &req) .await - { - Ok((session_id, token_pair)) => Ok(Json(LoginResponse::Ok { - version, - session_id, - tokens: Some(TokensInfo { - session_token: token_pair.session, - refresh_token: token_pair.refresh, - session_token_ttl_in_secs: SESSION_TOKEN_TTL.as_secs(), - refresh_token_ttl_in_secs: REFRESH_TOKEN_TTL.as_secs(), - }), - })), - Err(e) => Ok(Json(LoginResponse::Error { - error: QueryError::from_error_code(e), - })), + .map_err(HttpErrorCode::bad_request)?; + + match ctx.credential { + Credential::Jwt { .. } => { + let session_id = ClientSessionManager::instance() + .new_session_id_for_jwt(&ctx.session) + .await + .map_err(HttpErrorCode::server_error)?; + Ok(Json(LoginResponse { + version, + session_id, + refresh_interval_in_secs: SESSION_TOKEN_TTL.as_secs(), + session_token: None, + refresh_token: None, + })) + } + Credential::Password { .. } => { + let (session_id, token_pair) = ClientSessionManager::instance() + .new_token_pair(&ctx.session, None, None) + .await + .map_err(HttpErrorCode::server_error)?; + Ok(Json(LoginResponse { + version, + session_id, + + refresh_interval_in_secs: SESSION_TOKEN_TTL.as_secs(), + session_token: Some(token_pair.session.clone()), + refresh_token: Some(token_pair.refresh.clone()), + })) + } + _ => unreachable!("/session/login expect password or JWT"), } } diff --git a/src/query/service/src/servers/http/v1/session/logout_handler.rs b/src/query/service/src/servers/http/v1/session/logout_handler.rs index d1cebd89273e..576963b3abed 100644 --- a/src/query/service/src/servers/http/v1/session/logout_handler.rs +++ b/src/query/service/src/servers/http/v1/session/logout_handler.rs @@ -17,6 +17,8 @@ use poem::error::Result as PoemResult; use poem::web::Json; use poem::IntoResponse; +use crate::auth::Credential; +use crate::servers::http::error::HttpErrorCode; use crate::servers::http::error::QueryError; use crate::servers::http::v1::session::client_session_manager::ClientSessionManager; use crate::servers::http::v1::HttpQueryContext; @@ -29,12 +31,12 @@ pub struct LogoutResponse { #[poem::handler] #[async_backtrace::framed] pub async fn logout_handler(ctx: &HttpQueryContext) -> PoemResult { - let error = if let Some(token) = &ctx.databend_token { + let error = if let Credential::DatabendToken { token, .. } = &ctx.credential { ClientSessionManager::instance() .drop_client_session(token) .await - .map_err(QueryError::from_error_code) - .err() + .map_err(HttpErrorCode::server_error)?; + None } else { // should not get here since request is already authed Some(QueryError { diff --git a/src/query/service/src/servers/http/v1/session/mod.rs b/src/query/service/src/servers/http/v1/session/mod.rs index 65fed34245bb..9709f81bb5ba 100644 --- a/src/query/service/src/servers/http/v1/session/mod.rs +++ b/src/query/service/src/servers/http/v1/session/mod.rs @@ -13,9 +13,10 @@ // limitations under the License. mod client_session_manager; +mod consts; pub mod login_handler; pub(crate) mod logout_handler; -pub mod renew_handler; +pub mod refresh_handler; mod token; pub use client_session_manager::ClientSessionManager; diff --git a/src/query/service/src/servers/http/v1/session/refresh_handler.rs b/src/query/service/src/servers/http/v1/session/refresh_handler.rs new file mode 100644 index 000000000000..ace0ba993c4f --- /dev/null +++ b/src/query/service/src/servers/http/v1/session/refresh_handler.rs @@ -0,0 +1,81 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::ErrorCode; +use jwt_simple::prelude::Deserialize; +use jwt_simple::prelude::Serialize; +use poem::error::Result as PoemResult; +use poem::web::Json; +use poem::IntoResponse; + +use crate::auth::Credential; +use crate::servers::http::error::HttpErrorCode; +use crate::servers::http::v1::session::client_session_manager::ClientSessionManager; +use crate::servers::http::v1::session::consts::SESSION_TOKEN_TTL; +use crate::servers::http::v1::HttpQueryContext; + +#[derive(Deserialize, Clone)] +struct RefreshRequest { + pub session_id: Option, + // to drop the old token earlier instead of waiting for expiration + pub session_token: Option, +} + +#[derive(Serialize, Debug, Clone)] +pub struct RefreshResponse { + session_token: Option, + refresh_token: Option, + refresh_interval_in_secs: u64, +} + +#[poem::handler] +#[async_backtrace::framed] +pub async fn refresh_handler( + ctx: &HttpQueryContext, + Json(req): Json, +) -> PoemResult { + let mgr = ClientSessionManager::instance(); + match &ctx.credential { + Credential::Jwt { .. } => { + let session_id = + req.session_id + .ok_or(HttpErrorCode::bad_request(ErrorCode::BadArguments( + "JWT session should provide session_id when refresh session", + )))?; + mgr.refresh_in_memory_states(&session_id, &ctx.session); + mgr.refresh_session_handle(&ctx.session, &session_id) + .await + .map_err(HttpErrorCode::server_error)?; + Ok(Json(RefreshResponse { + refresh_interval_in_secs: SESSION_TOKEN_TTL.as_secs(), + session_token: None, + refresh_token: None, + })) + } + Credential::DatabendToken { token, .. } => { + let (_, token_pair) = mgr + .new_token_pair(&ctx.session, Some(token.clone()), req.session_token) + .await + .map_err(HttpErrorCode::server_error)?; + Ok(Json(RefreshResponse { + refresh_interval_in_secs: SESSION_TOKEN_TTL.as_secs(), + session_token: Some(token_pair.session.clone()), + refresh_token: Some(token_pair.refresh.clone()), + })) + } + _ => { + unreachable!("/session/refresh should be authed by databend refresh token or JWT token") + } + } +} diff --git a/src/query/service/src/servers/http/v1/session/renew_handler.rs b/src/query/service/src/servers/http/v1/session/renew_handler.rs deleted file mode 100644 index 162d2baa404d..000000000000 --- a/src/query/service/src/servers/http/v1/session/renew_handler.rs +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use jwt_simple::prelude::Deserialize; -use jwt_simple::prelude::Serialize; -use poem::error::Result as PoemResult; -use poem::web::Json; -use poem::IntoResponse; - -use crate::servers::http::error::QueryError; -use crate::servers::http::v1::session::client_session_manager::ClientSessionManager; -use crate::servers::http::v1::session::client_session_manager::TokenPair; -use crate::servers::http::v1::session::client_session_manager::REFRESH_TOKEN_TTL; -use crate::servers::http::v1::session::client_session_manager::SESSION_TOKEN_TTL; -use crate::servers::http::v1::HttpQueryContext; - -#[derive(Deserialize, Clone)] -struct RenewRequest { - pub session_token: String, -} - -#[derive(Serialize, Debug, Clone)] -#[serde(untagged)] -pub enum RenewResponse { - Ok { - session_token: String, - refresh_token: String, - session_token_validity_in_secs: u64, - refresh_token_validity_in_secs: u64, - }, - Error { - error: QueryError, - }, -} - -#[poem::handler] -#[async_backtrace::framed] -pub async fn renew_handler( - ctx: &HttpQueryContext, - Json(req): Json, -) -> PoemResult { - let refresh_token = ctx - .databend_token - .as_ref() - .expect("/session/renew should be authed by refresh token") - .clone(); - match ClientSessionManager::instance() - .new_token_pair( - &ctx.session, - Some(TokenPair { - session: req.session_token, - refresh: refresh_token, - }), - ) - .await - { - Ok((_, token_pair)) => Ok(Json(RenewResponse::Ok { - session_token: token_pair.session, - refresh_token: token_pair.refresh, - session_token_validity_in_secs: SESSION_TOKEN_TTL.as_secs(), - refresh_token_validity_in_secs: REFRESH_TOKEN_TTL.as_secs(), - })), - Err(e) => Ok(Json(RenewResponse::Error { - error: QueryError::from_error_code(e), - })), - } -} diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.py b/tests/suites/1_stateful/09_http_handler/09_0007_token.py index 52507fd4c9d2..187ad2ccbae5 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.py +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.py @@ -11,7 +11,7 @@ query_url = "http://localhost:8000/v1/query" login_url = "http://localhost:8000/v1/session/login" logout_url = "http://localhost:8000/v1/session/logout" -renew_url = "http://localhost:8000/v1/session/renew" +renew_url = "http://localhost:8000/v1/session/refresh" auth = ("root", "") @@ -51,7 +51,7 @@ def do_logout(_case_id, session_token): @print_error -def do_renew(_case_id, refresh_token, session_token): +def do_refresh(_case_id, refresh_token, session_token): payload = {"session_token": session_token} response = requests.post( renew_url, @@ -80,7 +80,8 @@ def do_query(query, session_token): def fake_expired_token(): expired_claim = { - "exp": int(time.time()) - 10, + # TTL_GRACE_PERIOD_QUERY = 600 + "exp": int(time.time()) - 610, "tenant": "", "user": "", "nonce": "", @@ -94,7 +95,6 @@ def fake_expired_token(): def main(): login_resp = do_login() pprint(sorted(login_resp.keys())) - print(login_resp.get("refresh_token_validity_in_secs")) session_token = login_resp.get("session_token") refresh_token = login_resp.get("refresh_token") # print(session_token) @@ -108,9 +108,8 @@ def main(): do_query("select 4", fake_expired_token()) do_query("select 5", refresh_token) - renew_resp = do_renew(1, refresh_token, session_token) + renew_resp = do_refresh(1, refresh_token, session_token) pprint(sorted(renew_resp.keys())) - print(renew_resp.get("refresh_token_validity_in_secs")) new_session_token = renew_resp.get("session_token") new_refresh_token = renew_resp.get("refresh_token") @@ -122,20 +121,23 @@ def main(): pprint(query_resp.get("data")) # errors - do_renew(2, "xxx", session_token) - do_renew(3, "bend-v1-xxx", session_token) - do_renew(4, fake_expired_token(), session_token) - do_renew(5, session_token, session_token) + do_refresh(2, "xxx", session_token) + do_refresh(3, "bend-v1-xxx", session_token) + do_refresh(4, fake_expired_token(), session_token) + do_refresh(5, session_token, session_token) # test new_refresh_token works - do_renew(6, new_refresh_token, session_token) + do_refresh(6, new_refresh_token, session_token) do_logout(0, new_refresh_token) do_logout(1, new_session_token) do_query("select 'after logout'", new_session_token) - do_renew("after_logout", new_refresh_token, session_token) + do_refresh("after_logout", new_refresh_token, session_token) if __name__ == "__main__": - main() + try: + main() + except Exception as e: + print(f"An error occurred: {e}") diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.result b/tests/suites/1_stateful/09_http_handler/09_0007_token.result index 7cf035f4b1c7..4d9c49940059 100644 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.result +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.result @@ -1,12 +1,10 @@ ---- do_login() 200 -['refresh_token', - 'refresh_token_validity_in_secs', +['refresh_interval_in_secs', + 'refresh_token', 'session_id', 'session_token', - 'session_token_validity_in_secs', 'version'] -14400 ---- do_query('select 1',) 200 [['1']] @@ -24,34 +22,30 @@ ---- do_query('select 5',) 401 {'code': 5103, 'message': 'session token not found'} ----- do_renew(1,) +---- do_refresh(1,) 200 -['refresh_token', - 'refresh_token_validity_in_secs', - 'session_token', - 'session_token_validity_in_secs'] -14400 +['refresh_interval_in_secs', 'refresh_token', 'session_token'] ---- do_query('select 6',) 200 [['6']] ---- do_query('select 7',) 200 [['7']] ----- do_renew(2,) +---- do_refresh(2,) 401 {'code': 5100, 'message': 'jwt auth not configured.'} ----- do_renew(3,) +---- do_refresh(3,) 401 {'code': 5100, 'message': 'fail to decode token(base64 decode error: Invalid padding): ' 'bend-v1-xxx'} ----- do_renew(4,) +---- do_refresh(4,) 401 {'code': 5102, 'message': 'refresh token expired'} ----- do_renew(5,) +---- do_refresh(5,) 401 {'code': 5104, 'message': 'refresh token not found'} ----- do_renew(6,) +---- do_refresh(6,) 200 ---- do_logout(0,) 401 @@ -61,6 +55,6 @@ ---- do_query("select 'after logout'",) 401 {'code': 5103, 'message': 'session token not found'} ----- do_renew('after_logout',) +---- do_refresh('after_logout',) 401 {'code': 5104, 'message': 'refresh token not found'} From 9cb4a277131faf18b8496aebb0d7d7345a4da2a2 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Wed, 11 Sep 2024 22:59:04 +0800 Subject: [PATCH 03/10] test cluster. --- .../1_stateful/09_http_handler/09_0007_token.py | 15 +++++++++++++-- .../09_http_handler/09_0007_token.result | 5 +++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.py b/tests/suites/1_stateful/09_http_handler/09_0007_token.py index 187ad2ccbae5..04b25881e978 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.py +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.py @@ -9,6 +9,7 @@ # Define the URLs and credentials query_url = "http://localhost:8000/v1/query" +query_url2 = "http://localhost:8002/v1/query" login_url = "http://localhost:8000/v1/session/login" logout_url = "http://localhost:8000/v1/session/logout" renew_url = "http://localhost:8000/v1/session/refresh" @@ -65,10 +66,10 @@ def do_refresh(_case_id, refresh_token, session_token): @print_error -def do_query(query, session_token): +def do_query(query, session_token, url=query_url): query_payload = {"sql": query, "pagination": {"wait_time_secs": 11}} response = requests.post( - query_url, + url, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {session_token}", @@ -102,6 +103,16 @@ def main(): # ok query_resp = do_query("select 1", session_token) pprint(query_resp.get("data")) + + # cluster + query_resp = do_query("select count(*) from system.clusters", session_token) + num_nodes = int(query_resp.get("data")[0][0]) + url = query_url + if num_nodes > 1: + url = query_url2 + query_resp = do_query("select 'cluster'", session_token, url) + pprint(query_resp.get("data")) + # errors do_query("select 2", "xxx") do_query("select 3", "bend-v1-xxx") diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.result b/tests/suites/1_stateful/09_http_handler/09_0007_token.result index 4d9c49940059..f46d4632414c 100644 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.result +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.result @@ -8,6 +8,11 @@ ---- do_query('select 1',) 200 [['1']] +---- do_query('select count(*) from system.clusters',) +200 +---- do_query("select 'cluster'",) +200 +[['cluster']] ---- do_query('select 2',) 401 {'code': 5100, 'message': 'jwt auth not configured.'} From 0e89458c5fdc9910359ee4c28ab46c7a62a0c98f Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 12 Sep 2024 19:58:12 +0800 Subject: [PATCH 04/10] add endpoint /v1/auth/verify. --- src/meta/app/src/principal/user_token.rs | 27 +++++++++++++ src/query/service/src/auth.rs | 12 +----- .../service/src/servers/http/http_services.rs | 13 ++++++- .../service/src/servers/http/middleware.rs | 22 ++++++----- .../http/v1/session/client_session_manager.rs | 16 +++----- .../src/servers/http/v1/session/token.rs | 22 +++++++---- .../09_http_handler/09_0007_token.py | 38 ++++++++++++++++--- .../09_http_handler/09_0007_token.result | 12 +++--- 8 files changed, 112 insertions(+), 50 deletions(-) diff --git a/src/meta/app/src/principal/user_token.rs b/src/meta/app/src/principal/user_token.rs index ed6249241f07..2f8add5c02d2 100644 --- a/src/meta/app/src/principal/user_token.rs +++ b/src/meta/app/src/principal/user_token.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Display; + +use databend_common_exception::ErrorCode; use serde::Deserialize; use serde::Serialize; @@ -26,6 +29,30 @@ pub enum TokenType { Session = 2, } +impl Display for TokenType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", match self { + TokenType::Refresh => 'r', + TokenType::Session => 's', + }) + } +} + +impl TryFrom for TokenType { + type Error = ErrorCode; + + fn try_from(value: u8) -> Result { + let value = value as char; + match value { + 'r' => Ok(TokenType::Refresh), + 's' => Ok(TokenType::Session), + _ => Err(ErrorCode::AuthenticateFailure(format!( + "invalid token type '{value}'" + ))), + } + } +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct QueryTokenInfo { pub token_type: TokenType, diff --git a/src/query/service/src/auth.rs b/src/query/service/src/auth.rs index aa2a133e346d..ed39f992bf05 100644 --- a/src/query/service/src/auth.rs +++ b/src/query/service/src/auth.rs @@ -18,7 +18,6 @@ use databend_common_base::base::GlobalInstance; use databend_common_config::InnerConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_meta_app::principal::user_token::TokenType; use databend_common_meta_app::principal::AuthInfo; use databend_common_meta_app::principal::UserIdentity; use databend_common_meta_app::principal::UserInfo; @@ -39,7 +38,6 @@ pub struct AuthMgr { pub enum Credential { DatabendToken { token: String, - token_type: TokenType, set_user: bool, }, Jwt { @@ -82,14 +80,8 @@ impl AuthMgr { let user_api = UserApiProvider::instance(); match credential { Credential::NoNeed => Ok(None), - Credential::DatabendToken { - token, - set_user, - token_type, - } => { - let claim = ClientSessionManager::instance() - .verify_token(token, token_type.clone()) - .await?; + Credential::DatabendToken { token, set_user } => { + let claim = ClientSessionManager::instance().verify_token(token).await?; let tenant = Tenant::new_or_err(claim.tenant.to_string(), func_name!())?; if *set_user { let identity = UserIdentity::new(claim.user, "%"); diff --git a/src/query/service/src/servers/http/http_services.rs b/src/query/service/src/servers/http/http_services.rs index 99f0ca8b1078..abd9ec8f58ec 100644 --- a/src/query/service/src/servers/http/http_services.rs +++ b/src/query/service/src/servers/http/http_services.rs @@ -21,6 +21,7 @@ use databend_common_exception::ErrorCode; use databend_common_http::HttpError; use databend_common_http::HttpShutdownHandler; use databend_common_meta_types::anyerror::AnyError; +use http::StatusCode; use log::info; use poem::get; use poem::listener::OpensslTlsConfig; @@ -32,11 +33,13 @@ use poem::put; use poem::Endpoint; use poem::EndpointExt; use poem::IntoEndpoint; +use poem::IntoResponse; use poem::Route; use super::v1::discovery_nodes; use super::v1::logout_handler; use super::v1::upload_to_stage; +use super::v1::HttpQueryContext; use crate::servers::http::middleware::json_response; use crate::servers::http::middleware::EndpointKind; use crate::servers::http::middleware::HTTPSessionMiddleware; @@ -81,6 +84,12 @@ pub struct HttpHandler { kind: HttpHandlerKind, } +#[poem::handler] +#[async_backtrace::framed] +pub async fn verify_handler(_ctx: &HttpQueryContext) -> poem::Result { + Ok(StatusCode::OK) +} + impl HttpHandler { pub fn create(kind: HttpHandlerKind) -> Box { Box::new(HttpHandler { @@ -126,9 +135,9 @@ impl HttpHandler { ) .at( "/auth/verify", - post(refresh_handler).with(HTTPSessionMiddleware::create( + get(verify_handler).with(HTTPSessionMiddleware::create( self.kind, - EndpointKind::Refresh, + EndpointKind::Verify, )), ) .at( diff --git a/src/query/service/src/servers/http/middleware.rs b/src/query/service/src/servers/http/middleware.rs index f091fd587cc4..e4acd99b45ae 100644 --- a/src/query/service/src/servers/http/middleware.rs +++ b/src/query/service/src/servers/http/middleware.rs @@ -76,6 +76,7 @@ pub enum EndpointKind { PollQuery, Clickhouse, NoAuth, + Verify, } const USER_AGENT: &str = "User-Agent"; @@ -209,11 +210,12 @@ fn auth_by_header( Some(bearer) => { let token = bearer.token().to_string(); if SessionClaim::is_databend_token(&token) { - let (token_type, set_user) = match endpoint_kind { - EndpointKind::Refresh => (TokenType::Refresh, true), - EndpointKind::StartQuery => (TokenType::Session, true), + let (exp_token_type, set_user) = match endpoint_kind { + EndpointKind::Verify => (None, false), + EndpointKind::Refresh => (Some(TokenType::Refresh), true), + EndpointKind::StartQuery => (Some(TokenType::Session), true), EndpointKind::PollQuery | EndpointKind::Logout => { - (TokenType::Session, false) + (Some(TokenType::Session), false) } EndpointKind::Login => { return Err(ErrorCode::AuthenticateFailure( @@ -229,11 +231,13 @@ fn auth_by_header( unreachable!() } }; - Ok(Credential::DatabendToken { - token, - token_type, - set_user, - }) + + if let Some(t) = exp_token_type { + if t != SessionClaim::get_type(&token)? { + return Err(ErrorCode::AuthenticateFailure("wrong data token type")); + } + } + Ok(Credential::DatabendToken { token, set_user }) } else { Ok(Credential::Jwt { token, client_ip }) } diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index b3b400b6ee43..e04f2022f745 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -192,7 +192,7 @@ impl ClientSessionManager { let user = session.get_current_user()?.name; let auth_role = session.privilege_mgr().get_auth_role(); let client_session_id = if let Some(old) = &old_refresh_token { - let claim = SessionClaim::decode(&old)?; + let (claim, _) = SessionClaim::decode(&old)?; assert_eq!(tenant_name, claim.tenant); assert_eq!(user, claim.user); assert_eq!(auth_role, claim.auth_role); @@ -212,7 +212,7 @@ impl ClientSessionManager { &auth_role, REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, ); - let refresh_token = claim.encode(); + let refresh_token = claim.encode(TokenType::Refresh); let refresh_token_hash = hash_token(refresh_token.as_bytes()); let token_info = QueryTokenInfo { token_type: TokenType::Refresh, @@ -240,7 +240,7 @@ impl ClientSessionManager { claim.expire_at_in_secs = (now + SESSION_TOKEN_TTL).as_secs(); claim.nonce = uuid::Uuid::new_v4().to_string(); - let session_token = claim.encode(); + let session_token = claim.encode(TokenType::Session); let session_token_hash = hash_token(session_token.as_bytes()); let token_info = QueryTokenInfo { token_type: TokenType::Session, @@ -280,12 +280,8 @@ impl ClientSessionManager { })) } - pub(crate) async fn verify_token( - self: &Arc, - token: &str, - token_type: TokenType, - ) -> Result { - let claim = SessionClaim::decode(token)?; + pub(crate) async fn verify_token(self: &Arc, token: &str) -> Result { + let (claim, token_type) = SessionClaim::decode(token)?; let now = unix_ts().as_secs(); if now > claim.expire_at_in_secs + TTL_GRACE_PERIOD_QUERY.as_secs() { return match token_type { @@ -325,7 +321,7 @@ impl ClientSessionManager { Ok(claim) } pub async fn drop_client_session(self: &Arc, session_token: &str) -> Result<()> { - let claim = SessionClaim::decode(session_token)?; + let (claim, _) = SessionClaim::decode(session_token)?; let now = unix_ts().as_secs(); let client_session_api = diff --git a/src/query/service/src/servers/http/v1/session/token.rs b/src/query/service/src/servers/http/v1/session/token.rs index 2fd546941d60..3355d3933ca4 100644 --- a/src/query/service/src/servers/http/v1/session/token.rs +++ b/src/query/service/src/servers/http/v1/session/token.rs @@ -17,12 +17,13 @@ use std::time::Duration; use base64::prelude::*; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_meta_app::principal::user_token::TokenType; use rand::rngs::OsRng; use rand::RngCore; use serde::Deserialize; use serde::Serialize; -const TOKEN_PREFIX: &str = "bend-v1-"; +const TOKEN_PREFIX: &str = "bend-v1"; fn generate_secure_nonce() -> String { let mut random_bytes = [0u8; 16]; OsRng.fill_bytes(&mut random_bytes); @@ -68,23 +69,30 @@ impl SessionClaim { token.starts_with(TOKEN_PREFIX) } - pub(crate) fn encode(&self) -> String { + pub fn get_type(token: &str) -> Result { + TokenType::try_from(token.as_bytes()[TOKEN_PREFIX.len() + 1]) + } + + pub(crate) fn encode(&self, token_type: TokenType) -> String { let token = BASE64_STANDARD.encode(serde_json::to_vec(&self).unwrap()); - format!("{TOKEN_PREFIX}{token}") + let t = token_type.to_string(); + format!("{TOKEN_PREFIX}-{t}-{token}") } - pub fn decode(token: &str) -> Result { + pub fn decode(token: &str) -> Result<(Self, TokenType)> { let fmt_err = |reason: String| { ErrorCode::AuthenticateFailure(format!("fail to decode token({reason}): {token}")) }; + let t = Self::get_type(token)?; if token.len() < TOKEN_PREFIX.len() { return Err(fmt_err("too short".to_string())); } - let token = &token.as_bytes()[TOKEN_PREFIX.len()..]; + let token = &token.as_bytes()[TOKEN_PREFIX.len() + 3..]; let json = BASE64_STANDARD .decode(token) .map_err(|e| fmt_err(format!("base64 decode error: {e}")))?; - serde_json::from_slice::(&json) - .map_err(|e| fmt_err(format!("json decode error: {e}"))) + let claim = serde_json::from_slice::(&json) + .map_err(|e| fmt_err(format!("json decode error: {e}")))?; + Ok((claim, t)) } } diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.py b/tests/suites/1_stateful/09_http_handler/09_0007_token.py index 04b25881e978..cba2bf4f2870 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.py +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.py @@ -13,6 +13,7 @@ login_url = "http://localhost:8000/v1/session/login" logout_url = "http://localhost:8000/v1/session/logout" renew_url = "http://localhost:8000/v1/session/refresh" +verify_url = "http://localhost:8000/v1/auth/verify" auth = ("root", "") @@ -51,6 +52,33 @@ def do_logout(_case_id, session_token): return response +def do_verify(session_token): + for token in [session_token, 'xxx']: + print("---- verify token ", token) + response = requests.get( + verify_url, + headers={"Authorization": f"Bearer {token}"}, + ) + print(response.status_code) + print(response.text) + + for a in [auth, ('u', 'p')]: + print("---- verify password: ", a) + response = requests.post( + verify_url, + auth=a, + ) + print(response.status_code) + print(response.text) + + print("---- verify no auth header ", token) + response = requests.get( + verify_url, + ) + print(response.status_code) + print(response.text) + + @print_error def do_refresh(_case_id, refresh_token, session_token): payload = {"session_token": session_token} @@ -79,7 +107,7 @@ def do_query(query, session_token, url=query_url): return response -def fake_expired_token(): +def fake_expired_token(ty): expired_claim = { # TTL_GRACE_PERIOD_QUERY = 600 "exp": int(time.time()) - 610, @@ -88,7 +116,7 @@ def fake_expired_token(): "nonce": "", "sid": "", } - return "bend-v1-" + base64.b64encode( + return "bend-v1-" + ty + '-' + base64.b64encode( json.dumps(expired_claim).encode("utf-8") ).decode("utf-8") @@ -115,8 +143,8 @@ def main(): # errors do_query("select 2", "xxx") - do_query("select 3", "bend-v1-xxx") - do_query("select 4", fake_expired_token()) + do_query("select 3", "bend-v1-s-xxx") + do_query("select 4", fake_expired_token('s')) do_query("select 5", refresh_token) renew_resp = do_refresh(1, refresh_token, session_token) @@ -134,7 +162,7 @@ def main(): # errors do_refresh(2, "xxx", session_token) do_refresh(3, "bend-v1-xxx", session_token) - do_refresh(4, fake_expired_token(), session_token) + do_refresh(4, fake_expired_token('r'), session_token) do_refresh(5, session_token, session_token) # test new_refresh_token works diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.result b/tests/suites/1_stateful/09_http_handler/09_0007_token.result index f46d4632414c..4d8628279c07 100644 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.result +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.result @@ -20,13 +20,13 @@ 401 {'code': 5100, 'message': 'fail to decode token(base64 decode error: Invalid padding): ' - 'bend-v1-xxx'} + 'bend-v1-s-xxx'} ---- do_query('select 4',) 401 {'code': 5101, 'message': 'session token expired'} ---- do_query('select 5',) 401 -{'code': 5103, 'message': 'session token not found'} +{'code': 5100, 'message': 'wrong data token type'} ---- do_refresh(1,) 200 ['refresh_interval_in_secs', 'refresh_token', 'session_token'] @@ -41,20 +41,18 @@ {'code': 5100, 'message': 'jwt auth not configured.'} ---- do_refresh(3,) 401 -{'code': 5100, - 'message': 'fail to decode token(base64 decode error: Invalid padding): ' - 'bend-v1-xxx'} +{'code': 5100, 'message': "unknown token type '120'"} ---- do_refresh(4,) 401 {'code': 5102, 'message': 'refresh token expired'} ---- do_refresh(5,) 401 -{'code': 5104, 'message': 'refresh token not found'} +{'code': 5100, 'message': 'wrong data token type'} ---- do_refresh(6,) 200 ---- do_logout(0,) 401 -{'code': 5103, 'message': 'session token not found'} +{'code': 5100, 'message': 'wrong data token type'} ---- do_logout(1,) 200 ---- do_query("select 'after logout'",) From 219c1813b60926ce6995c956131820fa28b99b38 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 12 Sep 2024 23:45:28 +0800 Subject: [PATCH 05/10] get session id from header. --- src/common/base/src/headers.rs | 1 + src/query/service/src/servers/http/middleware.rs | 9 ++++++++- .../src/servers/http/v1/query/http_query_context.rs | 1 + 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/common/base/src/headers.rs b/src/common/base/src/headers.rs index b05f0aa20220..db18c883948c 100644 --- a/src/common/base/src/headers.rs +++ b/src/common/base/src/headers.rs @@ -14,6 +14,7 @@ pub const HEADER_TENANT: &str = "X-DATABEND-TENANT"; pub const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID"; +pub const HEADER_SESSION_ID: &str = "X-DATABEND-SESSION-ID"; pub const HEADER_USER: &str = "X-DATABEND-USER"; diff --git a/src/query/service/src/servers/http/middleware.rs b/src/query/service/src/servers/http/middleware.rs index e4acd99b45ae..55c6915763a1 100644 --- a/src/query/service/src/servers/http/middleware.rs +++ b/src/query/service/src/servers/http/middleware.rs @@ -20,6 +20,7 @@ use std::time::Instant; use databend_common_base::headers::HEADER_DEDUPLICATE_LABEL; use databend_common_base::headers::HEADER_NODE_ID; use databend_common_base::headers::HEADER_QUERY_ID; +use databend_common_base::headers::HEADER_SESSION_ID; use databend_common_base::headers::HEADER_TENANT; use databend_common_base::runtime::ThreadTracker; use databend_common_config::GlobalConfig; @@ -314,7 +315,13 @@ impl HTTPSessionEndpoint { session.set_current_tenant(tenant); } - let client_session_id = self.auth_manager.auth(&mut session, &credential).await?; + let header_client_session_id = req + .headers() + .get(HEADER_SESSION_ID) + .map(|v| v.to_str().unwrap().to_string()); + let (user_name, authed_client_session_id) = + self.auth_manager.auth(&mut session, &credential, self.endpoint_kind.need_user_info()).await?; + let client_session_id = authed_client_session_id.or(header_client_session_id); if let Some(id) = client_session_id.clone() { session.set_client_session_id(id) } diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index dbce26eeaa75..996c1cd7feb2 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -43,6 +43,7 @@ pub struct HttpQueryContext { pub uri: String, pub client_host: Option, pub client_session_id: Option, + pub user_name: String, } impl HttpQueryContext { From fe0abdfd2a4d67c563d38fc7044669fad5a2d092 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 13 Sep 2024 00:04:48 +0800 Subject: [PATCH 06/10] refactor http handler auth. --- src/query/service/src/auth.rs | 22 +- .../service/src/servers/http/middleware.rs | 57 +++--- src/query/service/tests/it/auth.rs | 192 ++++++++++++------ 3 files changed, 169 insertions(+), 102 deletions(-) diff --git a/src/query/service/src/auth.rs b/src/query/service/src/auth.rs index ed39f992bf05..0b1aead610cc 100644 --- a/src/query/service/src/auth.rs +++ b/src/query/service/src/auth.rs @@ -38,7 +38,6 @@ pub struct AuthMgr { pub enum Credential { DatabendToken { token: String, - set_user: bool, }, Jwt { token: String, @@ -76,20 +75,21 @@ impl AuthMgr { &self, session: &mut Session, credential: &Credential, - ) -> Result> { + need_user_info: bool, + ) -> Result<(String, Option)> { let user_api = UserApiProvider::instance(); match credential { - Credential::NoNeed => Ok(None), - Credential::DatabendToken { token, set_user } => { + Credential::NoNeed => Ok(("".to_string(), None)), + Credential::DatabendToken { token } => { let claim = ClientSessionManager::instance().verify_token(token).await?; let tenant = Tenant::new_or_err(claim.tenant.to_string(), func_name!())?; - if *set_user { - let identity = UserIdentity::new(claim.user, "%"); + if need_user_info { + let identity = UserIdentity::new(claim.user.clone(), "%"); session.set_current_tenant(tenant.clone()); let user_info = user_api.get_user(&tenant, identity.clone()).await?; session.set_authed_user(user_info, claim.auth_role).await?; } - Ok(Some(claim.session_id)) + Ok((claim.user, Some(claim.session_id))) } Credential::Jwt { token: t, @@ -151,15 +151,15 @@ impl AuthMgr { }; session.set_authed_user(user, jwt.custom.role).await?; - Ok(None) + Ok((user_name, None)) } Credential::Password { - name: n, + name, password: p, client_ip, } => { let tenant = session.get_current_tenant(); - let identity = UserIdentity::new(n, "%"); + let identity = UserIdentity::new(name, "%"); let mut user = user_api .get_user_with_client_ip(&tenant, identity.clone(), client_ip.as_deref()) .await?; @@ -196,7 +196,7 @@ impl AuthMgr { authed?; session.set_authed_user(user, None).await?; - Ok(None) + Ok((name.to_string(), None)) } } } diff --git a/src/query/service/src/servers/http/middleware.rs b/src/query/service/src/servers/http/middleware.rs index 55c6915763a1..a0474310b346 100644 --- a/src/query/service/src/servers/http/middleware.rs +++ b/src/query/service/src/servers/http/middleware.rs @@ -68,7 +68,7 @@ use crate::servers::HttpHandlerKind; use crate::sessions::SessionManager; use crate::sessions::SessionType; -#[derive(Copy, Clone)] +#[derive(Debug, Copy, Clone)] pub enum EndpointKind { Login, Logout, @@ -80,6 +80,24 @@ pub enum EndpointKind { Verify, } +impl EndpointKind { + pub fn need_user_info(&self) -> bool { + !matches!(self, EndpointKind::NoAuth | EndpointKind::PollQuery) + } + pub fn require_databend_token_type(&self) -> Result> { + match self { + EndpointKind::Verify => Ok(None), + EndpointKind::Refresh => Ok(Some(TokenType::Refresh)), + EndpointKind::StartQuery | EndpointKind::PollQuery | EndpointKind::Logout => { + Ok(Some(TokenType::Session)) + } + _ => Err(ErrorCode::AuthenticateFailure(format!( + "should not use databend token for {self:?}", + ))), + } + } +} + const USER_AGENT: &str = "User-Agent"; const TRACE_PARENT: &str = "traceparent"; @@ -211,34 +229,12 @@ fn auth_by_header( Some(bearer) => { let token = bearer.token().to_string(); if SessionClaim::is_databend_token(&token) { - let (exp_token_type, set_user) = match endpoint_kind { - EndpointKind::Verify => (None, false), - EndpointKind::Refresh => (Some(TokenType::Refresh), true), - EndpointKind::StartQuery => (Some(TokenType::Session), true), - EndpointKind::PollQuery | EndpointKind::Logout => { - (Some(TokenType::Session), false) - } - EndpointKind::Login => { - return Err(ErrorCode::AuthenticateFailure( - "should not use databend token for login", - )); - } - EndpointKind::Clickhouse => { - return Err(ErrorCode::AuthenticateFailure( - "clickhouse handler should not use databend auth", - )); - } - EndpointKind::NoAuth => { - unreachable!() - } - }; - - if let Some(t) = exp_token_type { + if let Some(t) = endpoint_kind.require_databend_token_type()? { if t != SessionClaim::get_type(&token)? { return Err(ErrorCode::AuthenticateFailure("wrong data token type")); } } - Ok(Credential::DatabendToken { token, set_user }) + Ok(Credential::DatabendToken { token }) } else { Ok(Credential::Jwt { token, client_ip }) } @@ -319,8 +315,14 @@ impl HTTPSessionEndpoint { .headers() .get(HEADER_SESSION_ID) .map(|v| v.to_str().unwrap().to_string()); - let (user_name, authed_client_session_id) = - self.auth_manager.auth(&mut session, &credential, self.endpoint_kind.need_user_info()).await?; + let (user_name, authed_client_session_id) = self + .auth_manager + .auth( + &mut session, + &credential, + self.endpoint_kind.need_user_info(), + ) + .await?; let client_session_id = authed_client_session_id.or(header_client_session_id); if let Some(id) = client_session_id.clone() { session.set_client_session_id(id) @@ -366,6 +368,7 @@ impl HTTPSessionEndpoint { uri: req.uri().to_string(), client_host, client_session_id, + user_name, }) } } diff --git a/src/query/service/tests/it/auth.rs b/src/query/service/tests/it/auth.rs index 473c5bc421f3..ac14a84f453f 100644 --- a/src/query/service/tests/it/auth.rs +++ b/src/query/service/tests/it/auth.rs @@ -99,10 +99,14 @@ async fn test_auth_mgr_with_jwt_multi_sources() -> Result<()> { let token1 = pair1.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token: token1, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token: token1, + client_ip: None, + }, + true, + ) .await; assert!(res.is_ok()); @@ -158,10 +162,14 @@ async fn test_auth_mgr_with_jwt_multi_sources() -> Result<()> { ) .await?; let res2 = auth_mgr - .auth(&mut session, &Credential::Jwt { - token: token2, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token: token2, + client_ip: None, + }, + true, + ) .await; assert!(res2.is_ok()); assert_eq!(session.get_current_user().unwrap(), user2_info); @@ -177,10 +185,14 @@ async fn test_auth_mgr_with_jwt_multi_sources() -> Result<()> { .with_subject(user3.to_string()); let token3 = pair3.sign(claims)?; let res3 = auth_mgr - .auth(&mut session, &Credential::Jwt { - token: token3, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token: token3, + client_ip: None, + }, + true, + ) .await; assert!(res3.is_err()); assert!( @@ -317,10 +329,14 @@ async fn test_auth_mgr_with_jwt() -> Result<()> { let token = key_pair.sign(claims)?; auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await?; let user_info = session.get_current_user()?; assert!(user_info.grants.roles().is_empty()); @@ -338,10 +354,14 @@ async fn test_auth_mgr_with_jwt() -> Result<()> { let token = key_pair.sign(claims)?; auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await?; let user_info = session.get_current_user()?; @@ -363,10 +383,14 @@ async fn test_auth_mgr_with_jwt() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_ok()); @@ -386,10 +410,14 @@ async fn test_auth_mgr_with_jwt() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_err()); } @@ -438,10 +466,14 @@ async fn test_auth_mgr_with_jwt_es256() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_err()); assert!( @@ -458,10 +490,14 @@ async fn test_auth_mgr_with_jwt_es256() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_err()); assert!( @@ -480,10 +516,14 @@ async fn test_auth_mgr_with_jwt_es256() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_err()); assert!( @@ -502,10 +542,14 @@ async fn test_auth_mgr_with_jwt_es256() -> Result<()> { let token = key_pair.sign(claims)?; auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await?; let user_info = session.get_current_user()?; assert_eq!(user_info.grants.roles().len(), 0); @@ -521,10 +565,14 @@ async fn test_auth_mgr_with_jwt_es256() -> Result<()> { let token = key_pair.sign(claims)?; auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await?; let user_info = session.get_current_user()?; assert!(user_info.grants.roles().is_empty()); @@ -542,10 +590,14 @@ async fn test_auth_mgr_with_jwt_es256() -> Result<()> { let token = key_pair.sign(claims)?; auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await?; let user_info = session.get_current_user()?; assert_eq!(user_info.name, user_name); @@ -566,10 +618,14 @@ async fn test_auth_mgr_with_jwt_es256() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_ok()); @@ -589,10 +645,14 @@ async fn test_auth_mgr_with_jwt_es256() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_err()); } @@ -642,10 +702,14 @@ async fn test_jwt_auth_mgr_with_management() -> Result<()> { let token = key_pair.sign(claims)?; auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await?; let user_info = session.get_current_user()?; let current_tenant = session.get_current_tenant(); From caea4349e82651f1ca925c15595877def4becd94 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 13 Sep 2024 10:28:45 +0800 Subject: [PATCH 07/10] return need_sticky and need_refresh in session. --- .../servers/http/v1/http_query_handlers.rs | 11 ++++- .../servers/http/v1/query/execute_state.rs | 3 ++ .../src/servers/http/v1/query/http_query.rs | 48 +++++++++++++++---- .../http/v1/session/client_session_manager.rs | 5 +- .../http/v1/session/refresh_handler.rs | 4 +- 5 files changed, 57 insertions(+), 14 deletions(-) diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index bc1b813dd9d3..58650db2c40e 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -43,6 +43,7 @@ use super::query::ExecuteStateKind; use super::query::HttpQueryRequest; use super::query::HttpQueryResponseInternal; use super::query::RemoveReason; +use crate::servers::http::error::HttpErrorCode; use crate::servers::http::error::QueryError; use crate::servers::http::middleware::EndpointKind; use crate::servers::http::middleware::HTTPSessionMiddleware; @@ -230,7 +231,10 @@ async fn query_final_handler( .await? { Some(query) => { - let mut response = query.get_response_state_only().await; + let mut response = query + .get_response_state_only() + .await + .map_err(HttpErrorCode::server_error)?; // it is safe to set these 2 fields to None, because client now check for null/None first. response.session = None; response.state.affect = None; @@ -292,7 +296,10 @@ async fn query_state_handler( if let Some(reason) = query.check_removed() { Err(query_id_removed(&query_id, reason)) } else { - let response = query.get_response_state_only().await; + let response = query + .get_response_state_only() + .await + .map_err(HttpErrorCode::server_error)?; Ok(QueryResponse::from_internal(query_id, response, false)) } } diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 2e848aa498e8..abb181cd69ca 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -27,6 +27,7 @@ use databend_common_expression::DataSchemaRef; use databend_common_expression::Scalar; use databend_common_io::prelude::FormatSettings; use databend_common_settings::Settings; +use databend_storages_common_session::TempTblMgrRef; use databend_storages_common_session::TxnManagerRef; use futures::StreamExt; use log::debug; @@ -153,6 +154,7 @@ pub struct ExecutorSessionState { pub secondary_roles: Option>, pub settings: Arc, pub txn_manager: TxnManagerRef, + pub temp_tbl_mgr: TempTblMgrRef, pub variables: HashMap, } @@ -164,6 +166,7 @@ impl ExecutorSessionState { secondary_roles: session.get_secondary_roles(), settings: session.get_settings(), txn_manager: session.txn_mgr(), + temp_tbl_mgr: session.temp_tbl_mgr(), variables: session.get_all_variables(), } } diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 02990f992e8b..84a34bc9af1c 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -33,6 +33,7 @@ use databend_common_exception::Result; use databend_common_exception::ResultExt; use databend_common_expression::Scalar; use databend_common_io::prelude::FormatSettings; +use databend_common_meta_app::tenant::Tenant; use databend_common_metrics::http::metrics_incr_http_response_errors_count; use databend_common_settings::ScopeLevel; use databend_storages_common_session::TxnState; @@ -63,6 +64,7 @@ use crate::servers::http::v1::query::Executor; use crate::servers::http::v1::query::PageManager; use crate::servers::http::v1::query::ResponseData; use crate::servers::http::v1::query::Wait; +use crate::servers::http::v1::ClientSessionManager; use crate::servers::http::v1::HttpQueryManager; use crate::servers::http::v1::QueryResponse; use crate::servers::http::v1::QueryStats; @@ -272,6 +274,10 @@ pub struct HttpSessionConf { pub settings: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub txn_state: Option, + #[serde(default)] + pub need_sticky: bool, + #[serde(default)] + pub need_refresh: bool, // used to check if the session is still on the same server #[serde(skip_serializing_if = "Option::is_none")] pub last_server_info: Option, @@ -336,6 +342,8 @@ pub enum ExpireResult { pub struct HttpQuery { pub(crate) id: String, + pub(crate) tenant: Tenant, + pub(crate) user_name: String, pub(crate) client_session_id: Option, pub(crate) session_id: String, pub(crate) node_id: String, @@ -348,6 +356,7 @@ pub struct HttpQuery { /// exceed this result_timeout_secs. pub(crate) result_timeout_secs: u64, pub(crate) is_txn_mgr_saved: AtomicBool, + pub(crate) is_session_handle_refreshed: AtomicBool, } fn try_set_txn( @@ -523,6 +532,8 @@ impl HttpQuery { }; let format_settings: Arc>> = Default::default(); let format_settings_clone = format_settings.clone(); + let tenant = session.get_current_tenant(); + let user_name = session.get_current_user()?.name; http_query_runtime_instance.runtime().try_spawn( async move { let state = state_clone.clone(); @@ -565,6 +576,8 @@ impl HttpQuery { let query = HttpQuery { id: query_id, + tenant, + user_name, client_session_id: http_ctx.client_session_id.clone(), session_id, node_id, @@ -574,6 +587,7 @@ impl HttpQuery { result_timeout_secs, expire_state: Arc::new(parking_lot::Mutex::new(ExpireState::Working)), is_txn_mgr_saved: AtomicBool::new(false), + is_session_handle_refreshed: AtomicBool::new(false), }; Ok(Arc::new(query)) @@ -584,7 +598,7 @@ impl HttpQuery { pub async fn get_response_page(&self, page_no: usize) -> Result { let data = Some(self.get_page(page_no).await?); let state = self.get_state().await; - let session = self.get_response_session().await; + let session = self.get_response_session().await?; Ok(HttpQueryResponseInternal { data, @@ -596,17 +610,17 @@ impl HttpQuery { } #[async_backtrace::framed] - pub async fn get_response_state_only(&self) -> HttpQueryResponseInternal { + pub async fn get_response_state_only(&self) -> Result { let state = self.get_state().await; - let session = self.get_response_session().await; + let session = self.get_response_session().await?; - HttpQueryResponseInternal { + Ok(HttpQueryResponseInternal { data: None, session_id: self.session_id.clone(), node_id: self.node_id.clone(), state, session: Some(session), - } + }) } #[async_backtrace::framed] @@ -616,7 +630,7 @@ impl HttpQuery { } #[async_backtrace::framed] - async fn get_response_session(&self) -> HttpSessionConf { + async fn get_response_session(&self) -> Result { let keep_server_session_secs = self .request .session @@ -648,6 +662,21 @@ impl HttpQuery { } else { None }; + let mut need_sticky = false; + let (need_refresh, just_changed) = session_state.temp_tbl_mgr.lock().is_empty(); + if let Some(cid) = &self.client_session_id { + if just_changed + && self + .is_session_handle_refreshed + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + ClientSessionManager::instance() + .refresh_session_handle(self.tenant.clone(), self.user_name.to_string(), cid) + .await?; + } + } + if txn_state != TxnState::AutoCommit && !self.is_txn_mgr_saved.load(Ordering::Relaxed) && matches!(executor.state, ExecuteState::Stopped(_)) @@ -656,6 +685,7 @@ impl HttpQuery { .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) .is_ok() { + need_sticky = true; let timeout = session_state .settings .get_idle_transaction_timeout_secs() @@ -664,17 +694,19 @@ impl HttpQuery { .add_txn(self.id.clone(), session_state.txn_manager.clone(), timeout) .await; } - HttpSessionConf { + Ok(HttpSessionConf { database: Some(database), role, secondary_roles, keep_server_session_secs, settings: Some(settings), txn_state: Some(txn_state), + need_sticky, + need_refresh, last_server_info: Some(HttpQueryManager::instance().server_info.clone()), last_query_ids: vec![self.id.clone()], internal, - } + }) } #[async_backtrace::framed] diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index e04f2022f745..e81e4dbea719 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -161,11 +161,10 @@ impl ClientSessionManager { pub async fn refresh_session_handle( &self, - session: &Arc, + tenant: Tenant, + user_name: String, client_session_id: &str, ) -> Result<()> { - let tenant = session.get_current_tenant(); - let user_name = session.get_current_user()?.name; let client_session_api = UserApiProvider::instance().client_session_api(&tenant); client_session_api .upsert_client_session_id( diff --git a/src/query/service/src/servers/http/v1/session/refresh_handler.rs b/src/query/service/src/servers/http/v1/session/refresh_handler.rs index ace0ba993c4f..91980518bdf5 100644 --- a/src/query/service/src/servers/http/v1/session/refresh_handler.rs +++ b/src/query/service/src/servers/http/v1/session/refresh_handler.rs @@ -54,7 +54,9 @@ pub async fn refresh_handler( "JWT session should provide session_id when refresh session", )))?; mgr.refresh_in_memory_states(&session_id, &ctx.session); - mgr.refresh_session_handle(&ctx.session, &session_id) + + let tenant = ctx.session.get_current_tenant(); + mgr.refresh_session_handle(tenant, ctx.user_name.clone(), &session_id) .await .map_err(HttpErrorCode::server_error)?; Ok(Json(RefreshResponse { From 27d03974738288977191aacd633fc5829d3ab6a4 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 13 Sep 2024 10:46:30 +0800 Subject: [PATCH 08/10] fix tests. --- tests/suites/1_stateful/09_http_handler/09_0007_token.result | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.result b/tests/suites/1_stateful/09_http_handler/09_0007_token.result index 4d8628279c07..944b5a94063b 100644 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.result +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.result @@ -41,7 +41,7 @@ {'code': 5100, 'message': 'jwt auth not configured.'} ---- do_refresh(3,) 401 -{'code': 5100, 'message': "unknown token type '120'"} +{'code': 5100, 'message': "invalid token type 'x'"} ---- do_refresh(4,) 401 {'code': 5102, 'message': 'refresh token expired'} From 19f87fc25c74286c6cb69b4ae73a7d1ce807df3f Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 13 Sep 2024 21:23:13 +0800 Subject: [PATCH 09/10] fix. --- src/query/service/src/auth.rs | 6 + .../service/src/interpreters/interpreter.rs | 7 - .../servers/http/v1/http_query_handlers.rs | 9 ++ .../servers/http/v1/query/execute_state.rs | 56 ++++--- .../src/servers/http/v1/query/http_query.rs | 142 +++++++++++------- .../http/v1/query/http_query_manager.rs | 47 ++++++ .../http/v1/session/client_session_manager.rs | 32 ++-- .../servers/http/v1/session/login_handler.rs | 5 +- .../http/v1/session/refresh_handler.rs | 2 +- .../09_http_handler/09_0007_token.py | 12 ++ .../09_http_handler/09_0007_token.result | 10 ++ 11 files changed, 207 insertions(+), 121 deletions(-) diff --git a/src/query/service/src/auth.rs b/src/query/service/src/auth.rs index 0b1aead610cc..5928dbd9d12c 100644 --- a/src/query/service/src/auth.rs +++ b/src/query/service/src/auth.rs @@ -51,6 +51,12 @@ pub enum Credential { NoNeed, } +impl Credential { + pub fn need_refresh(&self) -> bool { + matches!(self, Credential::DatabendToken { .. }) + } +} + impl AuthMgr { pub fn init(cfg: &InnerConfig) -> Result<()> { GlobalInstance::set(AuthMgr::create(cfg)); diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 1792b7fbe0ea..fa1c9e311377 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -53,10 +53,8 @@ use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; use crate::schedulers::ServiceQueryExecutor; -use crate::servers::http::v1::ClientSessionManager; use crate::sessions::QueryContext; use crate::sessions::SessionManager; -use crate::sessions::SessionType; use crate::stream::DataBlockStream; use crate::stream::ProgressStream; use crate::stream::PullingExecutorStream; @@ -191,11 +189,6 @@ fn log_query_finished(ctx: &QueryContext, error: Option, has_profiles let typ = session.get_type(); if typ.is_user_session() { SessionManager::instance().status.write().query_finish(now); - if typ == SessionType::HTTPQuery { - if let Some(cid) = session.get_client_session_id() { - ClientSessionManager::instance().on_query_finish(&cid, &session) - } - } } if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) { diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index 58650db2c40e..4dc9f09c83b2 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -323,6 +323,15 @@ async fn query_page_handler( let http_query_manager = HttpQueryManager::instance(); match http_query_manager.get_query(&query_id) { Some(query) => { + if query.user_name != ctx.user_name { + return Err(poem::error::Error::from_string( + format!( + "wrong user, query {} expect {}, got {}", + query_id, query.user_name, ctx.user_name + ), + StatusCode::UNAUTHORIZED, + )); + } query.check_client_session_id(&ctx.client_session_id)?; if let Some(reason) = query.check_removed() { Err(query_id_removed(&query_id, reason)) diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index abb181cd69ca..779e22dc9e29 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -44,7 +44,6 @@ use crate::interpreters::InterpreterQueryLog; use crate::servers::http::v1::http_query_handlers::QueryResponseField; use crate::servers::http::v1::query::http_query::ResponseState; use crate::servers::http::v1::query::sized_spsc::SizedChannelSender; -use crate::servers::http::v1::ClientSessionManager; use crate::sessions::AcquireQueueGuard; use crate::sessions::QueriesQueueManager; use crate::sessions::QueryAffect; @@ -261,26 +260,14 @@ impl Executor { #[async_backtrace::framed] pub async fn stop(this: &Arc>, reason: Result<(), C>) { let reason = reason.with_context(|| "execution stopped"); + let mut guard = this.write().await; - { - let guard = this.read().await; - if let Stopped(s) = &guard.state { - debug!( - "{}: http query already stopped, reason {:?}, new reason {:?}", - &guard.query_id, s.reason, reason - ); - return; - } else { + let state = match &guard.state { + Starting(s) => { info!( - "{}: http query change state to Stopped, reason {:?}", + "{}: http query begin changing state from Staring to Stopped, reason {:?}", &guard.query_id, reason ); - } - } - - let mut guard = this.write().await; - match &guard.state { - Starting(s) => { if let Err(e) = &reason { InterpreterQueryLog::log_finish( &s.ctx, @@ -295,38 +282,52 @@ impl Executor { s.ctx.get_current_session().txn_mgr().lock().set_fail(); } } - guard.state = Stopped(Box::new(ExecuteStopped { + ExecuteStopped { stats: Default::default(), schema: vec![], has_result_set: None, - reason, + reason: reason.clone(), session_state: ExecutorSessionState::new(s.ctx.get_current_session()), query_duration_ms: s.ctx.get_query_duration_ms(), warnings: s.ctx.pop_warnings(), affect: Default::default(), - })) + } } Running(r) => { + info!( + "{}: http query changing state from Running to Stopped, reason {:?}", + &guard.query_id, reason + ); if let Err(e) = &reason { if e.code() != ErrorCode::CLOSED_QUERY { r.session.txn_mgr().lock().set_fail(); } r.session.force_kill_query(e.clone()); } - - guard.state = Stopped(Box::new(ExecuteStopped { + ExecuteStopped { stats: Progresses::from_context(&r.ctx), schema: r.schema.clone(), has_result_set: Some(r.has_result_set), - reason, + reason: reason.clone(), session_state: ExecutorSessionState::new(r.ctx.get_current_session()), query_duration_ms: r.ctx.get_query_duration_ms(), warnings: r.ctx.pop_warnings(), affect: r.ctx.get_affect(), - })) + } } - Stopped(_) => {} - } + Stopped(s) => { + debug!( + "{}: http query already stopped, reason {:?}, new reason {:?}", + &guard.query_id, s.reason, reason + ); + return; + } + }; + info!( + "{}: http query has change state to Stopped, reason {:?}", + &guard.query_id, reason + ); + guard.state = Stopped(Box::new(state)); } } @@ -343,9 +344,6 @@ impl ExecuteState { let make_error = || format!("failed to start query: {sql}"); info!("http query prepare to plan sql"); - if let Some(cid) = session.get_client_session_id() { - ClientSessionManager::instance().on_query_start(&cid, &session) - } // Use interpreter_plan_sql, we can write the query log if an error occurs. let (plan, extras) = interpreter_plan_sql(ctx.clone(), &sql) diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 84a34bc9af1c..8641e665392f 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -41,6 +41,7 @@ use fastrace::prelude::*; use http::StatusCode; use log::info; use log::warn; +use parking_lot::Mutex; use poem::web::Json; use poem::IntoResponse; use serde::Deserialize; @@ -355,7 +356,12 @@ pub struct HttpQuery { /// should fetch the paginated result in a timely manner, and the interval should not /// exceed this result_timeout_secs. pub(crate) result_timeout_secs: u64, + + pub(crate) need_refresh: bool, pub(crate) is_txn_mgr_saved: AtomicBool, + + pub(crate) has_temp_table_before_run: bool, + pub(crate) has_temp_table_after_run: Mutex>, pub(crate) is_session_handle_refreshed: AtomicBool, } @@ -367,25 +373,7 @@ fn try_set_txn( ) -> Result<()> { match &session_conf.txn_state { Some(TxnState::Active) => { - if let Some(ServerInfo { id, start_time }) = &session_conf.last_server_info { - if http_query_manager.server_info.id != *id { - return Err(ErrorCode::InvalidSessionState(format!( - "transaction is active, but the request routed to the wrong server: current server is {}, the last is {}.", - http_query_manager.server_info.id, id - ))); - } - if http_query_manager.server_info.start_time != *start_time { - return Err(ErrorCode::CurrentTransactionIsAborted(format!( - "transaction is aborted because server restarted at {}.", - start_time - ))); - } - } else { - return Err(ErrorCode::InvalidSessionState( - "transaction is active but missing server_info".to_string(), - )); - } - + http_query_manager.check_sticky_for_txn(&session_conf.last_server_info)?; let last_query_id = session_conf.last_query_ids.first().ok_or_else(|| { ErrorCode::InvalidSessionState( "transaction is active but last_query_ids is empty".to_string(), @@ -419,7 +407,7 @@ impl HttpQuery { request: HttpQueryRequest, ) -> Result> { let http_query_manager = HttpQueryManager::instance(); - + let need_refresh = ctx.credential.need_refresh(); let session = ctx .upgrade_session(SessionType::HTTPQuery) .map_err(|err| ErrorCode::Internal(format!("{err}")))?; @@ -463,6 +451,11 @@ impl HttpQuery { } } try_set_txn(&ctx.query_id, &session, session_conf, &http_query_manager)?; + if session_conf.need_sticky + && matches!(session_conf.txn_state, None | Some(TxnState::AutoCommit)) + { + http_query_manager.check_sticky_for_temp_table(&session_conf.last_server_info)?; + } }; let settings = session.get_settings(); @@ -534,6 +527,13 @@ impl HttpQuery { let format_settings_clone = format_settings.clone(); let tenant = session.get_current_tenant(); let user_name = session.get_current_user()?.name; + + let has_temp_table_before_run = if let Some(cid) = session.get_client_session_id() { + ClientSessionManager::instance().on_query_start(&cid, &session); + true + } else { + false + }; http_query_runtime_instance.runtime().try_spawn( async move { let state = state_clone.clone(); @@ -585,9 +585,15 @@ impl HttpQuery { state, page_manager: data, result_timeout_secs, - expire_state: Arc::new(parking_lot::Mutex::new(ExpireState::Working)), - is_txn_mgr_saved: AtomicBool::new(false), - is_session_handle_refreshed: AtomicBool::new(false), + + expire_state: Arc::new(Mutex::new(ExpireState::Working)), + + need_refresh, + has_temp_table_before_run, + + is_txn_mgr_saved: Default::default(), + has_temp_table_after_run: Default::default(), + is_session_handle_refreshed: Default::default(), }; Ok(Arc::new(query)) @@ -612,14 +618,13 @@ impl HttpQuery { #[async_backtrace::framed] pub async fn get_response_state_only(&self) -> Result { let state = self.get_state().await; - let session = self.get_response_session().await?; Ok(HttpQueryResponseInternal { data: None, session_id: self.session_id.clone(), node_id: self.node_id.clone(), state, - session: Some(session), + session: None, }) } @@ -631,13 +636,6 @@ impl HttpQuery { #[async_backtrace::framed] async fn get_response_session(&self) -> Result { - let keep_server_session_secs = self - .request - .session - .clone() - .map(|v| v.keep_server_session_secs) - .unwrap_or(None); - // reply the updated session state, includes: // - current_database: updated by USE XXX; // - role: updated by SET ROLE; @@ -662,43 +660,71 @@ impl HttpQuery { } else { None }; - let mut need_sticky = false; - let (need_refresh, just_changed) = session_state.temp_tbl_mgr.lock().is_empty(); - if let Some(cid) = &self.client_session_id { - if just_changed + + if matches!(executor.state, ExecuteState::Stopped(_)) { + if let Some(cid) = &self.client_session_id { + let (has_temp_table_after_run, just_changed) = { + let mut guard = self.has_temp_table_after_run.lock(); + match *guard { + None => { + let not_empty = !session_state.temp_tbl_mgr.lock().is_empty().0; + *guard = Some(not_empty); + ClientSessionManager::instance().on_query_finish( + cid, + session_state.temp_tbl_mgr, + !not_empty, + not_empty != self.has_temp_table_before_run, + ); + (not_empty, true) + } + Some(v) => (v, false), + } + }; + + if !self.has_temp_table_before_run + && has_temp_table_after_run + && just_changed + && self + .is_session_handle_refreshed + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + ClientSessionManager::instance() + .refresh_session_handle( + self.tenant.clone(), + self.user_name.to_string(), + cid, + ) + .await?; + } + } + + if txn_state != TxnState::AutoCommit + && !self.is_txn_mgr_saved.load(Ordering::Relaxed) && self - .is_session_handle_refreshed + .is_txn_mgr_saved .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) .is_ok() { - ClientSessionManager::instance() - .refresh_session_handle(self.tenant.clone(), self.user_name.to_string(), cid) - .await?; + let timeout = session_state + .settings + .get_idle_transaction_timeout_secs() + .unwrap(); + HttpQueryManager::instance() + .add_txn(self.id.clone(), session_state.txn_manager.clone(), timeout) + .await; } } + let has_temp_table = (*self.has_temp_table_after_run.lock()).unwrap_or(false); + + let need_sticky = txn_state != TxnState::AutoCommit || has_temp_table; + let need_refresh = self.need_refresh || has_temp_table; - if txn_state != TxnState::AutoCommit - && !self.is_txn_mgr_saved.load(Ordering::Relaxed) - && matches!(executor.state, ExecuteState::Stopped(_)) - && self - .is_txn_mgr_saved - .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - need_sticky = true; - let timeout = session_state - .settings - .get_idle_transaction_timeout_secs() - .unwrap(); - HttpQueryManager::instance() - .add_txn(self.id.clone(), session_state.txn_manager.clone(), timeout) - .await; - } Ok(HttpSessionConf { database: Some(database), role, secondary_roles, - keep_server_session_secs, + keep_server_session_secs: None, settings: Some(settings), txn_state: Some(txn_state), need_sticky, diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs index 424f6a0aeaa1..12a186d77ed2 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs @@ -230,4 +230,51 @@ impl HttpQueryManager { None } } + + pub(crate) fn check_sticky_for_txn(&self, last_server_info: &Option) -> Result<()> { + if let Some(ServerInfo { id, start_time }) = last_server_info { + if self.server_info.id != *id { + return Err(ErrorCode::InvalidSessionState(format!( + "transaction is active, but the request routed to the wrong server: current server is {}, the last is {}.", + self.server_info.id, id + ))); + } + if self.server_info.start_time != *start_time { + return Err(ErrorCode::CurrentTransactionIsAborted(format!( + "transaction is aborted because server restarted at {}.", + start_time + ))); + } + } else { + return Err(ErrorCode::InvalidSessionState( + "transaction is active but missing server_info".to_string(), + )); + } + Ok(()) + } + + pub(crate) fn check_sticky_for_temp_table( + &self, + last_server_info: &Option, + ) -> Result<()> { + if let Some(ServerInfo { id, start_time }) = last_server_info { + if self.server_info.id != *id { + return Err(ErrorCode::InvalidSessionState(format!( + "there are temp tables in session, but the request routed to the wrong server: current server is {}, the last is {}.", + self.server_info.id, id + ))); + } + if self.server_info.start_time != *start_time { + return Err(ErrorCode::InvalidSessionState(format!( + "temp table lost because server restarted at {}.", + start_time + ))); + } + } else { + return Err(ErrorCode::InvalidSessionState( + "there are temp tables in session, but missing field server_info".to_string(), + )); + } + Ok(()) + } } diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index e81e4dbea719..294dc8e66f57 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -144,21 +144,6 @@ impl ClientSessionManager { } } - pub async fn new_session_id_for_jwt(&self, session: &Arc) -> Result { - let client_session_id = uuid::Uuid::new_v4().to_string(); - let tenant = session.get_current_tenant(); - let user_name = session.get_current_user()?.name; - let client_session_api = UserApiProvider::instance().client_session_api(&tenant); - client_session_api - .upsert_client_session_id( - &client_session_id, - ClientSession { user_name }, - REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, - ) - .await?; - Ok(client_session_id) - } - pub async fn refresh_session_handle( &self, tenant: Tenant, @@ -191,7 +176,7 @@ impl ClientSessionManager { let user = session.get_current_user()?.name; let auth_role = session.privilege_mgr().get_auth_role(); let client_session_id = if let Some(old) = &old_refresh_token { - let (claim, _) = SessionClaim::decode(&old)?; + let (claim, _) = SessionClaim::decode(old)?; assert_eq!(tenant_name, claim.tenant); assert_eq!(user, claim.user); assert_eq!(auth_role, claim.auth_role); @@ -229,7 +214,7 @@ impl ClientSessionManager { client_session_api .upsert_token(&old, token_info, TOMBSTONE_TTL, true) .await?; - self.refresh_in_memory_states(&client_session_id, session); + self.refresh_in_memory_states(&client_session_id); }; self.refresh_tokens .write() @@ -358,11 +343,10 @@ impl ClientSessionManager { Ok(()) } - pub fn refresh_in_memory_states(&self, client_session_id: &str, session: &Arc) { + pub fn refresh_in_memory_states(&self, client_session_id: &str) { let mut guard = self.session_state.lock(); guard.entry(client_session_id.to_string()).and_modify(|e| { e.query_state = QueryState::InUse; - session.set_temp_tbl_mgr(e.temp_tbl_mgr.clone()) }); } @@ -375,9 +359,13 @@ impl ClientSessionManager { } }); } - pub fn on_query_finish(&self, client_session_id: &str, session: &Arc) { - let temp_tbl_mgr = session.temp_tbl_mgr(); - let (is_empty, just_changed) = temp_tbl_mgr.lock().is_empty(); + pub fn on_query_finish( + &self, + client_session_id: &str, + temp_tbl_mgr: TempTblMgrRef, + is_empty: bool, + just_changed: bool, + ) { if !is_empty || just_changed { let mut guard = self.session_state.lock(); match guard.entry(client_session_id.to_string()) { diff --git a/src/query/service/src/servers/http/v1/session/login_handler.rs b/src/query/service/src/servers/http/v1/session/login_handler.rs index 14069ea14616..4de42eea2570 100644 --- a/src/query/service/src/servers/http/v1/session/login_handler.rs +++ b/src/query/service/src/servers/http/v1/session/login_handler.rs @@ -89,10 +89,7 @@ pub async fn login_handler( match ctx.credential { Credential::Jwt { .. } => { - let session_id = ClientSessionManager::instance() - .new_session_id_for_jwt(&ctx.session) - .await - .map_err(HttpErrorCode::server_error)?; + let session_id = uuid::Uuid::new_v4().to_string(); Ok(Json(LoginResponse { version, session_id, diff --git a/src/query/service/src/servers/http/v1/session/refresh_handler.rs b/src/query/service/src/servers/http/v1/session/refresh_handler.rs index 91980518bdf5..84b0bac34462 100644 --- a/src/query/service/src/servers/http/v1/session/refresh_handler.rs +++ b/src/query/service/src/servers/http/v1/session/refresh_handler.rs @@ -53,7 +53,7 @@ pub async fn refresh_handler( .ok_or(HttpErrorCode::bad_request(ErrorCode::BadArguments( "JWT session should provide session_id when refresh session", )))?; - mgr.refresh_in_memory_states(&session_id, &ctx.session); + mgr.refresh_in_memory_states(&session_id); let tenant = ctx.session.get_current_tenant(); mgr.refresh_session_handle(tenant, ctx.user_name.clone(), &session_id) diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.py b/tests/suites/1_stateful/09_http_handler/09_0007_token.py index cba2bf4f2870..4ed83ac7bc1b 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.py +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.py @@ -131,6 +131,9 @@ def main(): # ok query_resp = do_query("select 1", session_token) pprint(query_resp.get("data")) + pprint(query_resp.get("session").get("need_sticky")) + pprint(query_resp.get("session").get("need_refresh")) + # cluster query_resp = do_query("select count(*) from system.clusters", session_token) @@ -141,6 +144,15 @@ def main(): query_resp = do_query("select 'cluster'", session_token, url) pprint(query_resp.get("data")) + # temp table + query_resp = do_query("CREATE TEMP TABLE t(c1 int)", session_token) + pprint(query_resp.get("session").get("need_sticky")) + pprint(query_resp.get("session").get("need_refresh")) + + query_resp = do_query("drop TABLE t", session_token) + pprint(query_resp.get("session").get("need_sticky")) + pprint(query_resp.get("session").get("need_refresh")) + # errors do_query("select 2", "xxx") do_query("select 3", "bend-v1-s-xxx") diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_token.result b/tests/suites/1_stateful/09_http_handler/09_0007_token.result index 944b5a94063b..38ed846a2955 100644 --- a/tests/suites/1_stateful/09_http_handler/09_0007_token.result +++ b/tests/suites/1_stateful/09_http_handler/09_0007_token.result @@ -8,11 +8,21 @@ ---- do_query('select 1',) 200 [['1']] +False +True ---- do_query('select count(*) from system.clusters',) 200 ---- do_query("select 'cluster'",) 200 [['cluster']] +---- do_query('CREATE TEMP TABLE t(c1 int)',) +200 +True +True +---- do_query('drop TABLE t',) +200 +False +True ---- do_query('select 2',) 401 {'code': 5100, 'message': 'jwt auth not configured.'} From 54e8fcd1cd1f3c2eb4c77cb09d8a5c88c6352c20 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sat, 14 Sep 2024 01:21:27 +0800 Subject: [PATCH 10/10] fix. --- src/query/service/tests/it/auth.rs | 60 ++++++++++++------- .../it/servers/http/http_query_handlers.rs | 10 ++++ 2 files changed, 50 insertions(+), 20 deletions(-) diff --git a/src/query/service/tests/it/auth.rs b/src/query/service/tests/it/auth.rs index ac14a84f453f..d0fe9c73202e 100644 --- a/src/query/service/tests/it/auth.rs +++ b/src/query/service/tests/it/auth.rs @@ -128,10 +128,14 @@ async fn test_auth_mgr_with_jwt_multi_sources() -> Result<()> { .with_subject(user2.to_string()); let token2 = pair2.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token: token2, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token: token2, + client_ip: None, + }, + true, + ) .await; assert!(res.is_ok()); @@ -245,10 +249,14 @@ async fn test_auth_mgr_with_jwt() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_err()); @@ -266,10 +274,14 @@ async fn test_auth_mgr_with_jwt() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_err()); assert!( @@ -288,10 +300,14 @@ async fn test_auth_mgr_with_jwt() -> Result<()> { let token = key_pair.sign(claims)?; let res = auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await; assert!(res.is_err()); assert!( @@ -310,10 +326,14 @@ async fn test_auth_mgr_with_jwt() -> Result<()> { let token = key_pair.sign(claims)?; auth_mgr - .auth(&mut session, &Credential::Jwt { - token, - client_ip: None, - }) + .auth( + &mut session, + &Credential::Jwt { + token, + client_ip: None, + }, + true, + ) .await?; let user_info = session.get_current_user()?; assert_eq!(user_info.grants.roles().len(), 0); diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index 6fa2fd8f5e5d..23d611edfbc4 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -1391,6 +1391,8 @@ async fn test_affect() -> Result<()> { ("timezone".to_string(), "Asia/Shanghai".to_string()), ])), txn_state: Some(TxnState::AutoCommit), + need_sticky: false, + need_refresh: false, last_server_info: None, last_query_ids: vec![], internal: None, @@ -1414,6 +1416,8 @@ async fn test_affect() -> Result<()> { "6".to_string(), )])), txn_state: Some(TxnState::AutoCommit), + need_sticky: false, + need_refresh: false, last_server_info: None, last_query_ids: vec![], internal: None, @@ -1432,6 +1436,8 @@ async fn test_affect() -> Result<()> { "6".to_string(), )])), txn_state: Some(TxnState::AutoCommit), + need_sticky: false, + need_refresh: false, last_server_info: None, last_query_ids: vec![], internal: None, @@ -1452,6 +1458,8 @@ async fn test_affect() -> Result<()> { "6".to_string(), )])), txn_state: Some(TxnState::AutoCommit), + need_sticky: false, + need_refresh: false, last_server_info: None, last_query_ids: vec![], internal: None, @@ -1474,6 +1482,8 @@ async fn test_affect() -> Result<()> { "Asia/Shanghai".to_string(), )])), txn_state: Some(TxnState::AutoCommit), + need_sticky: false, + need_refresh: false, last_server_info: None, last_query_ids: vec![], internal: None,