Skip to content

Commit

Permalink
feat(core): Add route hint, make load balancer easier to route queries (
Browse files Browse the repository at this point in the history
#469)

* add RouteHintGenerator

* add in_active_transaction

* reset route hint

* chore: rename to route hint

* handle the route hint from query response
  • Loading branch information
flaneur2020 authored Aug 6, 2024
1 parent 3ca3dc3 commit dca4ea3
Showing 1 changed file with 62 additions and 5 deletions.
67 changes: 62 additions & 5 deletions core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -41,6 +42,8 @@ const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID";
const HEADER_TENANT: &str = "X-DATABEND-TENANT";
const HEADER_WAREHOUSE: &str = "X-DATABEND-WAREHOUSE";
const HEADER_STAGE_NAME: &str = "X-DATABEND-STAGE-NAME";
const HEADER_ROUTE_HINT: &str = "X-DATABEND-ROUTE-HINT";
const TXN_STATE_ACTIVE: &str = "Active";

static VERSION: Lazy<String> = Lazy::new(|| {
let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
Expand All @@ -61,6 +64,7 @@ pub struct APIClient {
tenant: Option<String>,
warehouse: Arc<Mutex<Option<String>>>,
session_state: Arc<Mutex<SessionState>>,
route_hint: Arc<RouteHintGenerator>,

wait_time_secs: Option<i64>,
max_rows_in_buffer: Option<i64>,
Expand Down Expand Up @@ -241,6 +245,15 @@ impl APIClient {
guard.role.clone()
}

async fn in_active_transaction(&self) -> bool {
let guard = self.session_state.lock().await;
guard
.txn_state
.as_ref()
.map(|s| s.eq_ignore_ascii_case(TXN_STATE_ACTIVE))
.unwrap_or(false)
}

pub fn username(&self) -> String {
self.auth.username()
}
Expand Down Expand Up @@ -280,6 +293,9 @@ impl APIClient {

pub async fn start_query(&self, sql: &str) -> Result<QueryResponse> {
info!("start query: {}", sql);
if !self.in_active_transaction().await {
self.route_hint.next();
}
let session_state = self.session_state().await;
let req = QueryRequest::new(sql)
.with_pagination(self.make_pagination())
Expand Down Expand Up @@ -308,13 +324,16 @@ impl APIClient {
)));
}

let resp: QueryResponse = resp.json().await?;
self.handle_session(&resp.session).await;
if let Some(err) = resp.error {
if let Some(route_hint) = resp.headers().get(HEADER_ROUTE_HINT) {
self.route_hint.set(route_hint.to_str().unwrap_or_default());
}
let result: QueryResponse = resp.json().await?;
self.handle_session(&result.session).await;
if let Some(err) = result.error {
return Err(Error::InvalidResponse(err));
}
self.handle_warnings(&resp);
Ok(resp)
self.handle_warnings(&result);
Ok(result)
}

pub async fn query_page(&self, query_id: &str, next_uri: &str) -> Result<QueryResponse> {
Expand Down Expand Up @@ -434,6 +453,8 @@ impl APIClient {
if let Some(warehouse) = &*warehouse {
headers.insert(HEADER_WAREHOUSE, warehouse.parse()?);
}
let route_hint = self.route_hint.current();
headers.insert(HEADER_ROUTE_HINT, route_hint.parse()?);
headers.insert(HEADER_QUERY_ID, query_id.parse()?);
Ok(headers)
}
Expand Down Expand Up @@ -588,10 +609,46 @@ impl Default for APIClient {
page_request_timeout: Duration::from_secs(30),
tls_ca_file: None,
presign: PresignMode::Auto,
route_hint: Arc::new(RouteHintGenerator::new()),
}
}
}

struct RouteHintGenerator {
nonce: AtomicU64,
current: std::sync::Mutex<String>,
}

impl RouteHintGenerator {
fn new() -> Self {
let gen = Self {
nonce: AtomicU64::new(0),
current: std::sync::Mutex::new("".to_string()),
};
gen.next();
gen
}

fn current(&self) -> String {
let guard = self.current.lock().unwrap();
guard.clone()
}

fn set(&self, hint: &str) {
let mut guard = self.current.lock().unwrap();
*guard = hint.to_string();
}

fn next(&self) -> String {
let nonce = self.nonce.fetch_add(1, Ordering::AcqRel);
let uuid = uuid::Uuid::new_v4();
let current = format!("rh:{}:{:06}", uuid, nonce);
let mut guard = self.current.lock().unwrap();
*guard = current.clone();
current
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit dca4ea3

Please sign in to comment.