Skip to content

Commit

Permalink
fix(frontend): use set time zone in Hummock time travel (#20031)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jan 6, 2025
1 parent aa3df8e commit 8a5a574
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 22 deletions.
14 changes: 10 additions & 4 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ pub async fn handle_query(
formats: Vec<Format>,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

let plan_fragmenter_result = {
let context = OptimizerContext::from_handler_args(handler_args);
let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
gen_batch_plan_fragmenter(&session, plan_result)?
// Time zone is used by Hummock time travel query.
risingwave_expr::expr_context::TIME_ZONE::sync_scope(
session.config().timezone().to_owned(),
|| gen_batch_plan_fragmenter(&session, plan_result),
)?
};
execute(session, plan_fragmenter_result, formats).await
}
Expand Down Expand Up @@ -99,8 +102,11 @@ pub async fn handle_execute(
let plan_fragmenter_result = {
let context = OptimizerContext::from_handler_args(handler_args);
let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;

gen_batch_plan_fragmenter(&session, plan_result)?
// Time zone is used by Hummock time travel query.
risingwave_expr::expr_context::TIME_ZONE::sync_scope(
session.config().timezone().to_owned(),
|| gen_batch_plan_fragmenter(&session, plan_result),
)?
};
execute(session, plan_fragmenter_result, result_formats).await
}
Expand Down
57 changes: 39 additions & 18 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::ops::Bound;
use std::vec;

use anyhow::anyhow;
use chrono::{MappedLocalTime, TimeZone};
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
Expand All @@ -29,15 +30,27 @@ use risingwave_common::constants::log_store::v2::{
KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
};
use risingwave_common::hash::VnodeCount;
use risingwave_common::types::ScalarImpl;
use risingwave_common::license::Feature;
use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamptz};
use risingwave_common::util::scan_range::{is_full_range, ScanRange};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::aggregate::PbAggKind;
use risingwave_pb::plan_common::as_of::AsOfType;
use risingwave_pb::plan_common::{as_of, PbAsOf};
use risingwave_sqlparser::ast::AsOf;

use super::generic::{self, GenericPlanRef, PhysicalPlanRef};
use super::pretty_config;
use crate::catalog::table_catalog::TableType;
use crate::catalog::{ColumnId, TableCatalog, TableId};
use crate::error::{ErrorCode, Result};
use crate::expr::InputRef;
use crate::optimizer::plan_node::generic::Agg;
use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
use crate::optimizer::property::{Cardinality, Order, RequiredDist, WatermarkColumns};
use crate::optimizer::StreamScanType;
use crate::utils::{Condition, IndexSet};
use crate::PlanRef;

#[derive(Default)]
pub struct TableCatalogBuilder {
Expand Down Expand Up @@ -346,20 +359,6 @@ macro_rules! plan_node_name {
};
}
pub(crate) use plan_node_name;
use risingwave_common::license::Feature;
use risingwave_common::types::{DataType, Interval};
use risingwave_expr::aggregate::PbAggKind;
use risingwave_pb::plan_common::as_of::AsOfType;
use risingwave_pb::plan_common::{as_of, PbAsOf};
use risingwave_sqlparser::ast::AsOf;

use super::generic::{self, GenericPlanRef, PhysicalPlanRef};
use super::pretty_config;
use crate::error::{ErrorCode, Result};
use crate::expr::InputRef;
use crate::optimizer::plan_node::generic::Agg;
use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
use crate::PlanRef;

pub fn infer_kv_log_store_table_catalog_inner(
input: &PlanRef,
Expand Down Expand Up @@ -441,9 +440,31 @@ pub fn to_pb_time_travel_as_of(a: &Option<AsOf>) -> Result<Option<PbAsOf>> {
AsOf::TimestampString(ts) => {
let date_time = speedate::DateTime::parse_str_rfc3339(ts)
.map_err(|_e| anyhow!("fail to parse timestamp"))?;
AsOfType::Timestamp(as_of::Timestamp {
timestamp: date_time.timestamp_tz(),
})
let timestamp = if date_time.time.tz_offset.is_none() {
// If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command.
risingwave_expr::expr_context::TIME_ZONE::try_with(|set_time_zone| {
let tz =
Timestamptz::lookup_time_zone(set_time_zone).map_err(|e| anyhow!(e))?;
match tz.with_ymd_and_hms(
date_time.date.year.into(),
date_time.date.month.into(),
date_time.date.day.into(),
date_time.time.hour.into(),
date_time.time.minute.into(),
date_time.time.second.into(),
) {
MappedLocalTime::Single(d) => Ok(d.timestamp()),
MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
Err(anyhow!(format!(
"failed to parse the timestamp {ts} with the specified time zone {tz}"
)))
}
}
})??
} else {
date_time.timestamp_tz()
};
AsOfType::Timestamp(as_of::Timestamp { timestamp })
}
AsOf::VersionNum(_) | AsOf::VersionString(_) => {
return Err(ErrorCode::NotSupported(
Expand Down

0 comments on commit 8a5a574

Please sign in to comment.