Skip to content

Commit

Permalink
chore: the fist http resp wait longer to tolerant old bendsql clients. (
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun authored Mar 19, 2024
1 parent d924b68 commit 45fde57
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
37 changes: 25 additions & 12 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl QueryResponse {
(JsonBlock::empty(), None)
} else {
match state.state {
ExecuteStateKind::Running => match r.data {
ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data {
None => (JsonBlock::empty(), Some(make_state_uri(&id))),
Some(d) => {
let uri = match d.next_page_no {
Expand Down Expand Up @@ -179,9 +179,14 @@ impl QueryResponse {
};
let rows = data.data.len();

let state_kind = match state.state {
ExecuteStateKind::Starting => ExecuteStateKind::Running,
_ => state.state,
};

Json(QueryResponse {
data: data.into(),
state: state.state,
state: state_kind,
schema: state.schema.clone(),
session_id: Some(session_id),
node_id: r.node_id,
Expand Down Expand Up @@ -368,22 +373,30 @@ pub(crate) async fn query_handler(
match query {
Ok(query) => {
query.update_expire_time(true).await;
let resp = query
.get_response_page(0)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?;
if matches!(resp.state.state,ExecuteStateKind::Failed) {
// tmp workaround to tolerant old clients
let max_wait_time = std::cmp::max(10, req.pagination.wait_time_secs);
let start = std::time::Instant::now();
let resp = loop {
let resp = query
.get_response_page(0)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?;
if matches!(resp.state.state, ExecuteStateKind::Starting) && start.elapsed().as_secs() < max_wait_time as u64 {
continue;
}
break resp
};
if matches!(resp.state.state, ExecuteStateKind::Failed) {
ctx.set_fail();
}
let (rows, next_page) = match &resp.data {
None => (0, None),
Some(p) => (p.page.data.num_rows(), p.next_page_no),
};
info!(
"http query initial response to http query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'",
&query.id, &resp.state, rows, next_page, mask_connection_info(&sql)
);
info!( "http query initial response to http query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'",
&query.id, &resp.state, rows, next_page, mask_connection_info(&sql)
);
query.update_expire_time(false).await;
Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response())
}
Expand Down
4 changes: 3 additions & 1 deletion src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::sessions::TableContext;

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
pub enum ExecuteStateKind {
Starting,
Running,
Failed,
Succeeded,
Expand Down Expand Up @@ -93,7 +94,8 @@ pub enum ExecuteState {
impl ExecuteState {
pub(crate) fn extract(&self) -> (ExecuteStateKind, Option<ErrorCode>) {
match self {
Starting(_) | Running(_) => (ExecuteStateKind::Running, None),
Starting(_) => (ExecuteStateKind::Starting, None),
Running(_) => (ExecuteStateKind::Running, None),
Stopped(v) => match &v.reason {
Ok(_) => (ExecuteStateKind::Succeeded, None),
Err(e) => (ExecuteStateKind::Failed, Some(e.clone())),
Expand Down

0 comments on commit 45fde57

Please sign in to comment.