From dca4ea3c12866f2969fe5bc81f7615df92379547 Mon Sep 17 00:00:00 2001 From: flaneur Date: Tue, 6 Aug 2024 20:09:43 +0800 Subject: [PATCH] feat(core): Add route hint, make load balancer easier to route queries (#469) * add RouteHintGenerator * add in_active_transaction * reset route hint * chore: rename to route hint * handle the route hint from query response --- core/src/client.rs | 67 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/core/src/client.rs b/core/src/client.rs index 1cfadb9f..5f90db51 100644 --- a/core/src/client.rs +++ b/core/src/client.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -41,6 +42,8 @@ const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID"; const HEADER_TENANT: &str = "X-DATABEND-TENANT"; const HEADER_WAREHOUSE: &str = "X-DATABEND-WAREHOUSE"; const HEADER_STAGE_NAME: &str = "X-DATABEND-STAGE-NAME"; +const HEADER_ROUTE_HINT: &str = "X-DATABEND-ROUTE-HINT"; +const TXN_STATE_ACTIVE: &str = "Active"; static VERSION: Lazy = Lazy::new(|| { let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"); @@ -61,6 +64,7 @@ pub struct APIClient { tenant: Option, warehouse: Arc>>, session_state: Arc>, + route_hint: Arc, wait_time_secs: Option, max_rows_in_buffer: Option, @@ -241,6 +245,15 @@ impl APIClient { guard.role.clone() } + async fn in_active_transaction(&self) -> bool { + let guard = self.session_state.lock().await; + guard + .txn_state + .as_ref() + .map(|s| s.eq_ignore_ascii_case(TXN_STATE_ACTIVE)) + .unwrap_or(false) + } + pub fn username(&self) -> String { self.auth.username() } @@ -280,6 +293,9 @@ impl APIClient { pub async fn start_query(&self, sql: &str) -> Result { info!("start query: {}", sql); + if !self.in_active_transaction().await { + self.route_hint.next(); + } let session_state = self.session_state().await; let req = QueryRequest::new(sql) .with_pagination(self.make_pagination()) @@ -308,13 +324,16 @@ impl APIClient { ))); } - let resp: QueryResponse = resp.json().await?; - self.handle_session(&resp.session).await; - if let Some(err) = resp.error { + if let Some(route_hint) = resp.headers().get(HEADER_ROUTE_HINT) { + self.route_hint.set(route_hint.to_str().unwrap_or_default()); + } + let result: QueryResponse = resp.json().await?; + self.handle_session(&result.session).await; + if let Some(err) = result.error { return Err(Error::InvalidResponse(err)); } - self.handle_warnings(&resp); - Ok(resp) + self.handle_warnings(&result); + Ok(result) } pub async fn query_page(&self, query_id: &str, next_uri: &str) -> Result { @@ -434,6 +453,8 @@ impl APIClient { if let Some(warehouse) = &*warehouse { headers.insert(HEADER_WAREHOUSE, warehouse.parse()?); } + let route_hint = self.route_hint.current(); + headers.insert(HEADER_ROUTE_HINT, route_hint.parse()?); headers.insert(HEADER_QUERY_ID, query_id.parse()?); Ok(headers) } @@ -588,10 +609,46 @@ impl Default for APIClient { page_request_timeout: Duration::from_secs(30), tls_ca_file: None, presign: PresignMode::Auto, + route_hint: Arc::new(RouteHintGenerator::new()), } } } +struct RouteHintGenerator { + nonce: AtomicU64, + current: std::sync::Mutex, +} + +impl RouteHintGenerator { + fn new() -> Self { + let gen = Self { + nonce: AtomicU64::new(0), + current: std::sync::Mutex::new("".to_string()), + }; + gen.next(); + gen + } + + fn current(&self) -> String { + let guard = self.current.lock().unwrap(); + guard.clone() + } + + fn set(&self, hint: &str) { + let mut guard = self.current.lock().unwrap(); + *guard = hint.to_string(); + } + + fn next(&self) -> String { + let nonce = self.nonce.fetch_add(1, Ordering::AcqRel); + let uuid = uuid::Uuid::new_v4(); + let current = format!("rh:{}:{:06}", uuid, nonce); + let mut guard = self.current.lock().unwrap(); + *guard = current.clone(); + current + } +} + #[cfg(test)] mod test { use super::*;