From 8a5a574a65266e226c9cf944747f9069b0f800e5 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Mon, 6 Jan 2025 22:28:16 +0800 Subject: [PATCH] fix(frontend): use set time zone in Hummock time travel (#20031) --- src/frontend/src/handler/query.rs | 14 +++-- src/frontend/src/optimizer/plan_node/utils.rs | 57 +++++++++++++------ 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 2dfe4ff37b197..ce7cef55c0b85 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -56,11 +56,14 @@ pub async fn handle_query( formats: Vec, ) -> Result { let session = handler_args.session.clone(); - let plan_fragmenter_result = { let context = OptimizerContext::from_handler_args(handler_args); let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; - gen_batch_plan_fragmenter(&session, plan_result)? + // Time zone is used by Hummock time travel query. + risingwave_expr::expr_context::TIME_ZONE::sync_scope( + session.config().timezone().to_owned(), + || gen_batch_plan_fragmenter(&session, plan_result), + )? }; execute(session, plan_fragmenter_result, formats).await } @@ -99,8 +102,11 @@ pub async fn handle_execute( let plan_fragmenter_result = { let context = OptimizerContext::from_handler_args(handler_args); let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?; - - gen_batch_plan_fragmenter(&session, plan_result)? + // Time zone is used by Hummock time travel query. + risingwave_expr::expr_context::TIME_ZONE::sync_scope( + session.config().timezone().to_owned(), + || gen_batch_plan_fragmenter(&session, plan_result), + )? }; execute(session, plan_fragmenter_result, result_formats).await } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 432b4ccb6c3b6..a025ecdc4c756 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -18,6 +18,7 @@ use std::ops::Bound; use std::vec; use anyhow::anyhow; +use chrono::{MappedLocalTime, TimeZone}; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode}; @@ -29,15 +30,27 @@ use risingwave_common::constants::log_store::v2::{ KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX, }; use risingwave_common::hash::VnodeCount; -use risingwave_common::types::ScalarImpl; +use risingwave_common::license::Feature; +use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamptz}; use risingwave_common::util::scan_range::{is_full_range, ScanRange}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; +use risingwave_expr::aggregate::PbAggKind; +use risingwave_pb::plan_common::as_of::AsOfType; +use risingwave_pb::plan_common::{as_of, PbAsOf}; +use risingwave_sqlparser::ast::AsOf; +use super::generic::{self, GenericPlanRef, PhysicalPlanRef}; +use super::pretty_config; use crate::catalog::table_catalog::TableType; use crate::catalog::{ColumnId, TableCatalog, TableId}; +use crate::error::{ErrorCode, Result}; +use crate::expr::InputRef; +use crate::optimizer::plan_node::generic::Agg; +use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall}; use crate::optimizer::property::{Cardinality, Order, RequiredDist, WatermarkColumns}; use crate::optimizer::StreamScanType; use crate::utils::{Condition, IndexSet}; +use crate::PlanRef; #[derive(Default)] pub struct TableCatalogBuilder { @@ -346,20 +359,6 @@ macro_rules! plan_node_name { }; } pub(crate) use plan_node_name; -use risingwave_common::license::Feature; -use risingwave_common::types::{DataType, Interval}; -use risingwave_expr::aggregate::PbAggKind; -use risingwave_pb::plan_common::as_of::AsOfType; -use risingwave_pb::plan_common::{as_of, PbAsOf}; -use risingwave_sqlparser::ast::AsOf; - -use super::generic::{self, GenericPlanRef, PhysicalPlanRef}; -use super::pretty_config; -use crate::error::{ErrorCode, Result}; -use crate::expr::InputRef; -use crate::optimizer::plan_node::generic::Agg; -use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall}; -use crate::PlanRef; pub fn infer_kv_log_store_table_catalog_inner( input: &PlanRef, @@ -441,9 +440,31 @@ pub fn to_pb_time_travel_as_of(a: &Option) -> Result> { AsOf::TimestampString(ts) => { let date_time = speedate::DateTime::parse_str_rfc3339(ts) .map_err(|_e| anyhow!("fail to parse timestamp"))?; - AsOfType::Timestamp(as_of::Timestamp { - timestamp: date_time.timestamp_tz(), - }) + let timestamp = if date_time.time.tz_offset.is_none() { + // If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command. + risingwave_expr::expr_context::TIME_ZONE::try_with(|set_time_zone| { + let tz = + Timestamptz::lookup_time_zone(set_time_zone).map_err(|e| anyhow!(e))?; + match tz.with_ymd_and_hms( + date_time.date.year.into(), + date_time.date.month.into(), + date_time.date.day.into(), + date_time.time.hour.into(), + date_time.time.minute.into(), + date_time.time.second.into(), + ) { + MappedLocalTime::Single(d) => Ok(d.timestamp()), + MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => { + Err(anyhow!(format!( + "failed to parse the timestamp {ts} with the specified time zone {tz}" + ))) + } + } + })?? + } else { + date_time.timestamp_tz() + }; + AsOfType::Timestamp(as_of::Timestamp { timestamp }) } AsOf::VersionNum(_) | AsOf::VersionString(_) => { return Err(ErrorCode::NotSupported(