diff --git a/scripts/ci/deploy/config/databend-query-node-1.toml b/scripts/ci/deploy/config/databend-query-node-1.toml index 659dcc5150f7c..78bbe541bd70b 100644 --- a/scripts/ci/deploy/config/databend-query-node-1.toml +++ b/scripts/ci/deploy/config/databend-query-node-1.toml @@ -162,3 +162,6 @@ max_bytes = 21474836480 [spill] spill_local_disk_path = "./.databend/temp/_query_spill" +# Cap local spill to 5GB so window spills keep ~1GB quota with default 20% ratio. +spill_local_disk_max_bytes = 1073741824 +window_partition_spilling_disk_quota_ratio = 20 diff --git a/scripts/databend_test_helper/databend_test_helper/configs/databend-query-node2.toml b/scripts/databend_test_helper/databend_test_helper/configs/databend-query-node2.toml index 395762577d3ce..31226f4fc4e8f 100644 --- a/scripts/databend_test_helper/databend_test_helper/configs/databend-query-node2.toml +++ b/scripts/databend_test_helper/databend_test_helper/configs/databend-query-node2.toml @@ -62,4 +62,4 @@ path = "_databend_data/cache/query2" max_bytes = 1073741824 # 1GB [spill] -spill_local_disk_path = "_databend_data/spill/query2" \ No newline at end of file +spill_local_disk_path = "_databend_data/spill/query2" diff --git a/scripts/databend_test_helper/databend_test_helper/configs/databend-query-node3.toml b/scripts/databend_test_helper/databend_test_helper/configs/databend-query-node3.toml index e1902a81ed411..e697e6bc74295 100644 --- a/scripts/databend_test_helper/databend_test_helper/configs/databend-query-node3.toml +++ b/scripts/databend_test_helper/databend_test_helper/configs/databend-query-node3.toml @@ -62,4 +62,4 @@ path = "_databend_data/cache/query3" max_bytes = 1073741824 # 1GB [spill] -spill_local_disk_path = "_databend_data/spill/query3" \ No newline at end of file +spill_local_disk_path = "_databend_data/spill/query3" diff --git a/scripts/test-bend-tests/configs/query/query-2.toml b/scripts/test-bend-tests/configs/query/query-2.toml index 80f28e95cad55..b2e30bcab830b 100644 --- a/scripts/test-bend-tests/configs/query/query-2.toml +++ b/scripts/test-bend-tests/configs/query/query-2.toml @@ -63,4 +63,4 @@ path = "_databend_data/cache/query2" max_bytes = 1073741824 # 1GB [spill] -spill_local_disk_path = "_databend_data/spill/query2" \ No newline at end of file +spill_local_disk_path = "_databend_data/spill/query2" diff --git a/scripts/test-bend-tests/configs/query/query-3.toml b/scripts/test-bend-tests/configs/query/query-3.toml index 5c0054c5677e9..de202339eb073 100644 --- a/scripts/test-bend-tests/configs/query/query-3.toml +++ b/scripts/test-bend-tests/configs/query/query-3.toml @@ -63,4 +63,4 @@ path = "_databend_data/cache/query3" max_bytes = 1073741824 # 1GB [spill] -spill_local_disk_path = "_databend_data/spill/query3" \ No newline at end of file +spill_local_disk_path = "_databend_data/spill/query3" diff --git a/scripts/test-bend-tests/configs/query/query-4.toml b/scripts/test-bend-tests/configs/query/query-4.toml index d33136fa5ba74..aa9fa7b01e555 100644 --- a/scripts/test-bend-tests/configs/query/query-4.toml +++ b/scripts/test-bend-tests/configs/query/query-4.toml @@ -63,4 +63,4 @@ path = "_databend_data/cache/query4" max_bytes = 1073741824 # 1GB [spill] -spill_local_disk_path = "_databend_data/spill/query4" \ No newline at end of file +spill_local_disk_path = "_databend_data/spill/query4" diff --git a/scripts/test-bend-tests/configs/query/query-5.toml b/scripts/test-bend-tests/configs/query/query-5.toml index 43d2a646cdb8e..1a55e279b6042 100644 --- a/scripts/test-bend-tests/configs/query/query-5.toml +++ b/scripts/test-bend-tests/configs/query/query-5.toml @@ -63,4 +63,4 @@ path = "_databend_data/cache/query5" max_bytes = 1073741824 # 1GB [spill] -spill_local_disk_path = "_databend_data/spill/query5" \ No newline at end of file +spill_local_disk_path = "_databend_data/spill/query5" diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 99729c0b702be..a5248a072a119 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -798,7 +798,7 @@ pub struct FsStorageConfig { impl FsStorageConfig { fn default_reserved_space_percentage() -> Option> { - None // Use None as default, will use system default (30.0) if not specified + None // Use None as default, will use system default (10.0) if not specified } } @@ -3585,13 +3585,32 @@ pub struct SpillConfig { #[clap(long, value_name = "VALUE", default_value = "")] pub spill_local_disk_path: String, - #[clap(long, value_name = "VALUE", default_value = "30")] + #[clap(long, value_name = "VALUE", default_value = "10")] /// Percentage of reserve disk space that won't be used for spill to local disk. pub spill_local_disk_reserved_space_percentage: OrderedFloat, #[clap(long, value_name = "VALUE", default_value = "18446744073709551615")] /// Allow space in bytes to spill to local disk. pub spill_local_disk_max_bytes: u64, + + /// Maximum percentage of the global local spill quota that a single sort + /// operator may use for one query. + /// + /// Value range: 0-100. Effective only when local spill is enabled (there is + /// a valid local spill path and non-zero `spill_local_disk_max_bytes`). + #[clap(long, value_name = "PERCENT", default_value = "60")] + pub sort_spilling_disk_quota_ratio: u64, + + /// Maximum percentage of the global local spill quota that window + /// partitioners may use for one query. + #[clap(long, value_name = "PERCENT", default_value = "60")] + pub window_partition_spilling_disk_quota_ratio: u64, + + /// Maximum percentage of the global local spill quota that HTTP + /// result-set spilling may use for one query. + /// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. + #[clap(long, value_name = "PERCENT", default_value = "0")] + pub result_set_spilling_disk_quota_ratio: u64, } impl SpillConfig { @@ -3630,8 +3649,12 @@ impl Default for SpillConfig { Self { storage: None, spill_local_disk_path: String::new(), - spill_local_disk_reserved_space_percentage: OrderedFloat(30.0), + spill_local_disk_reserved_space_percentage: OrderedFloat(10.0), spill_local_disk_max_bytes: u64::MAX, + sort_spilling_disk_quota_ratio: 60, + window_partition_spilling_disk_quota_ratio: 60, + // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. + result_set_spilling_disk_quota_ratio: 0, } } } @@ -3706,7 +3729,7 @@ mod cache_config_converters { catalogs.insert(CATALOG_HIVE.to_string(), catalog); } - let spill = convert_local_spill_config(spill, &cache.disk_cache_config)?; + let spill = convert_local_spill_config(spill)?; Ok(InnerConfig { query: query.try_into()?, @@ -3803,10 +3826,7 @@ mod cache_config_converters { } } - fn convert_local_spill_config( - spill: SpillConfig, - cache: &DiskCacheConfig, - ) -> Result { + fn convert_local_spill_config(spill: SpillConfig) -> Result { // Determine configuration based on auto-detected spill type let spill_type = spill.get_spill_type(); let (local_writeable_root, path, reserved_disk_ratio, global_bytes_limit, storage_params) = @@ -3818,7 +3838,7 @@ mod cache_config_converters { let reserved_ratio = storage .fs .reserved_space_percentage - .unwrap_or(OrderedFloat(30.0)) + .unwrap_or(OrderedFloat(10.0)) / 100.0; let max_bytes = storage.fs.max_bytes.unwrap_or(u64::MAX); @@ -3864,16 +3884,11 @@ mod cache_config_converters { ) } _ => { - // Default behavior for "default" type and any unrecognized types - // Default behavior with backward compatibility - let local_writeable_root = if cache.path != DiskCacheConfig::default().path - && spill.spill_local_disk_path.is_empty() - { - Some(cache.path.clone()) - } else { - None - }; - + // Default behavior for "default" type and any unrecognized types: + // do NOT implicitly reuse the data cache disk. Local spill is + // enabled only when explicitly configured via either + // - [spill.storage] with type = "fs", or + // - legacy spill_local_disk_path. let storage_params = spill .storage .map(|storage| { @@ -3883,7 +3898,7 @@ mod cache_config_converters { .transpose()?; ( - local_writeable_root, + None, spill.spill_local_disk_path, spill.spill_local_disk_reserved_space_percentage / 100.0, spill.spill_local_disk_max_bytes, @@ -3898,6 +3913,10 @@ mod cache_config_converters { reserved_disk_ratio, global_bytes_limit, storage_params, + sort_spilling_disk_quota_ratio: spill.sort_spilling_disk_quota_ratio, + window_partition_spilling_disk_quota_ratio: spill + .window_partition_spilling_disk_quota_ratio, + result_set_spilling_disk_quota_ratio: spill.result_set_spilling_disk_quota_ratio, }) } @@ -3916,6 +3935,10 @@ mod cache_config_converters { spill_local_disk_path: value.path, spill_local_disk_reserved_space_percentage: value.reserved_disk_ratio * 100.0, spill_local_disk_max_bytes: value.global_bytes_limit, + sort_spilling_disk_quota_ratio: value.sort_spilling_disk_quota_ratio, + window_partition_spilling_disk_quota_ratio: value + .window_partition_spilling_disk_quota_ratio, + result_set_spilling_disk_quota_ratio: value.result_set_spilling_disk_quota_ratio, } } } diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index bf4def1fcf506..b3cc31b0a8415 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -787,6 +787,22 @@ pub struct SpillConfig { pub global_bytes_limit: u64, pub storage_params: Option, + + /// Maximum percentage of the global local spill quota that a single + /// sort operator may use for one query. + /// + /// Value range: 0-100. Effective only when local spill is enabled + /// (i.e. there is a valid local spill path and non-zero global + /// bytes limit). + pub sort_spilling_disk_quota_ratio: u64, + + /// Maximum percentage of the global local spill quota that window + /// partitioners may use for one query. + pub window_partition_spilling_disk_quota_ratio: u64, + + /// Maximum percentage of the global local spill quota that HTTP + /// result-set spilling may use for one query. + pub result_set_spilling_disk_quota_ratio: u64, } impl SpillConfig { @@ -807,6 +823,44 @@ impl SpillConfig { None } + /// Helper to compute a per-query local spill quota (in bytes) from a + /// percentage of the global local spill limit. + /// + /// - If local spill is disabled (no local path or zero global + /// limit), returns 0. + /// - `ratio` is clamped into [0, 100]. + pub fn quota_bytes_from_ratio(&self, ratio: u64) -> usize { + // Only effective when local spill is enabled. + if self.local_path().is_none() { + return 0; + } + + let ratio = std::cmp::min(ratio, 100); + if ratio == 0 { + return 0; + } + + let bytes = self.global_bytes_limit.saturating_mul(ratio) / 100; + + // TempDirManager works with `usize` limits. + std::cmp::min(bytes, usize::MAX as u64) as usize + } + + /// Per-query quota for sort operators. + pub fn sort_spill_bytes_limit(&self) -> usize { + self.quota_bytes_from_ratio(self.sort_spilling_disk_quota_ratio) + } + + /// Per-query quota for window partitioners. + pub fn window_partition_spill_bytes_limit(&self) -> usize { + self.quota_bytes_from_ratio(self.window_partition_spilling_disk_quota_ratio) + } + + /// Per-query quota for HTTP result-set spilling. + pub fn result_set_spill_bytes_limit(&self) -> usize { + self.quota_bytes_from_ratio(self.result_set_spilling_disk_quota_ratio) + } + pub fn new_for_test(path: String, reserved_disk_ratio: f64, global_bytes_limit: u64) -> Self { Self { local_writeable_root: None, @@ -814,6 +868,11 @@ impl SpillConfig { reserved_disk_ratio: OrderedFloat(reserved_disk_ratio), global_bytes_limit, storage_params: None, + // Use the same defaults as the external config. + sort_spilling_disk_quota_ratio: 60, + window_partition_spilling_disk_quota_ratio: 60, + // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. + result_set_spilling_disk_quota_ratio: 0, } } } @@ -823,9 +882,13 @@ impl Default for SpillConfig { Self { local_writeable_root: None, path: "".to_string(), - reserved_disk_ratio: OrderedFloat(0.3), + reserved_disk_ratio: OrderedFloat(0.1), global_bytes_limit: u64::MAX, storage_params: None, + sort_spilling_disk_quota_ratio: 60, + window_partition_spilling_disk_quota_ratio: 60, + // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. + result_set_spilling_disk_quota_ratio: 0, } } } diff --git a/src/query/config/src/mask.rs b/src/query/config/src/mask.rs index 20a5247dee335..7bdc4c9f79b15 100644 --- a/src/query/config/src/mask.rs +++ b/src/query/config/src/mask.rs @@ -215,6 +215,9 @@ impl SpillConfig { ref spill_local_disk_path, spill_local_disk_reserved_space_percentage, spill_local_disk_max_bytes, + sort_spilling_disk_quota_ratio, + window_partition_spilling_disk_quota_ratio, + result_set_spilling_disk_quota_ratio, } = *self; Self { @@ -222,6 +225,10 @@ impl SpillConfig { spill_local_disk_path: spill_local_disk_path.clone(), spill_local_disk_reserved_space_percentage, spill_local_disk_max_bytes, + sort_spilling_disk_quota_ratio, + window_partition_spilling_disk_quota_ratio, + // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. + result_set_spilling_disk_quota_ratio, } } } @@ -375,6 +382,10 @@ mod tests { spill_local_disk_path: "".to_string(), spill_local_disk_reserved_space_percentage: 30.0.into(), spill_local_disk_max_bytes: 10, + sort_spilling_disk_quota_ratio: 60, + window_partition_spilling_disk_quota_ratio: 30, + // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. + result_set_spilling_disk_quota_ratio: 0, storage: Some(StorageConfig { typ: "s3".to_string(), s3: S3StorageConfig { diff --git a/src/query/service/src/physical_plans/physical_recluster.rs b/src/query/service/src/physical_plans/physical_recluster.rs index ff904e44abec5..50111a2533d51 100644 --- a/src/query/service/src/physical_plans/physical_recluster.rs +++ b/src/query/service/src/physical_plans/physical_recluster.rs @@ -21,6 +21,7 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::ReclusterTask; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; +use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataSchemaRef; @@ -319,9 +320,12 @@ impl IPhysicalPlan for HilbertPartition { )?; let settings = builder.settings.clone(); - let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?; let temp_dir_manager = TempDirManager::instance(); + let disk_bytes_limit = GlobalConfig::instance() + .spill + .window_partition_spill_bytes_limit(); + let enable_dio = settings.get_enable_dio()?; let disk_spill = temp_dir_manager .get_disk_spill_dir(disk_bytes_limit, &builder.ctx.get_id()) diff --git a/src/query/service/src/physical_plans/physical_window_partition.rs b/src/query/service/src/physical_plans/physical_window_partition.rs index 0da66a86e595e..7158a06df5c53 100644 --- a/src/query/service/src/physical_plans/physical_window_partition.rs +++ b/src/query/service/src/physical_plans/physical_window_partition.rs @@ -18,6 +18,7 @@ use std::sync::atomic::AtomicUsize; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::table_context::TableContext; +use databend_common_config::GlobalConfig; use databend_common_exception::Result; use databend_common_expression::SortColumnDescription; use databend_common_pipeline::core::ProcessorPtr; @@ -148,7 +149,9 @@ impl IPhysicalPlan for WindowPartition { } let temp_dir_manager = TempDirManager::instance(); - let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?; + let disk_bytes_limit = GlobalConfig::instance() + .spill + .window_partition_spill_bytes_limit(); let enable_dio = settings.get_enable_dio()?; let disk_spill = temp_dir_manager .get_disk_spill_dir(disk_bytes_limit, &builder.ctx.get_id()) diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index dc6f6f06d3d3e..d397e71f41463 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_config::GlobalConfig; use databend_common_exception::Result; use databend_common_expression::DataSchemaRef; use databend_common_expression::LimitType; @@ -132,7 +133,7 @@ impl SortPipelineBuilder { let spiller = { let temp_dir_manager = TempDirManager::instance(); - let disk_bytes_limit = settings.get_sort_spilling_to_disk_bytes_limit()?; + let disk_bytes_limit = GlobalConfig::instance().spill.sort_spill_bytes_limit(); let enable_dio = settings.get_enable_dio()?; let disk_spill = temp_dir_manager .get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 6b7d63e7d90da..8bd75194199b1 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -19,8 +19,6 @@ use std::time::Instant; use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; -use databend_common_base::runtime::profile::Profile; -use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -42,6 +40,7 @@ use crate::pipelines::processors::transforms::aggregator::SerializedPayload; use crate::sessions::QueryContext; use crate::spillers::Layout; use crate::spillers::SpillAdapter; +use crate::spillers::SpillTarget; use crate::spillers::SpillsBufferPool; use crate::spillers::SpillsDataWriter; @@ -52,10 +51,12 @@ struct PayloadWriter { impl PayloadWriter { fn try_create(prefix: &str) -> Result { - let operator = DataOperator::instance().spill_operator(); + let data_operator = DataOperator::instance(); + let target = SpillTarget::from_storage_params(data_operator.spill_params()); + let operator = data_operator.spill_operator(); let buffer_pool = SpillsBufferPool::instance(); let file_path = format!("{}/{}", prefix, GlobalUniqName::unique()); - let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?; + let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?; Ok(PayloadWriter { path: file_path, @@ -218,7 +219,7 @@ impl AggregatePayloadWriters { } let stats = self.write_stats.take(); - flush_write_profile(&self.ctx, stats); + flush_write_profile(&self.ctx, stats, self.is_local); Ok(spilled_payloads) } @@ -356,18 +357,23 @@ impl NewAggregateSpiller { row_group, } = payload; - let operator = DataOperator::instance().spill_operator(); + let data_operator = DataOperator::instance(); + let target = SpillTarget::from_storage_params(data_operator.spill_params()); + let operator = data_operator.spill_operator(); let buffer_pool = SpillsBufferPool::instance(); - let mut reader = - buffer_pool.reader(operator.clone(), location.clone(), vec![row_group.clone()])?; + let mut reader = buffer_pool.reader( + operator.clone(), + location.clone(), + vec![row_group.clone()], + target, + )?; let instant = Instant::now(); let data_block = reader.read(self.read_setting)?; let elapsed = instant.elapsed(); let read_size = reader.read_bytes(); - flush_read_profile(&elapsed, read_size); info!( "Read aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, elapsed: {:?})", @@ -400,31 +406,11 @@ impl NewAggregateSpiller { } } -fn flush_read_profile(elapsed: &Duration, read_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillReadTime, - elapsed.as_millis() as usize, - ); -} - -fn flush_write_profile(ctx: &Arc, stats: WriteStats) { +fn flush_write_profile(ctx: &Arc, stats: WriteStats, _is_local: bool) { if stats.count == 0 && stats.bytes == 0 && stats.rows == 0 { return; } - if stats.count > 0 { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, stats.count); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillWriteTime, - stats.elapsed.as_millis() as usize, - ); - } - if stats.bytes > 0 { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteBytes, stats.bytes); - } - if stats.rows > 0 || stats.bytes > 0 { let progress_val = ProgressValues { rows: stats.rows, diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs index 7663631cf321c..2c9611c31d5a9 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs @@ -40,6 +40,7 @@ use crate::pipelines::processors::HashJoinDesc; use crate::sessions::QueryContext; use crate::spillers::Layout; use crate::spillers::SpillAdapter; +use crate::spillers::SpillTarget; use crate::spillers::SpillsBufferPool; use crate::spillers::SpillsDataReader; use crate::spillers::SpillsDataWriter; @@ -255,11 +256,14 @@ impl GraceHashJoin { } fn restore_build_data(&mut self) -> Result<()> { - let operator = DataOperator::instance().spill_operator(); + let data_operator = DataOperator::instance(); + let target = SpillTarget::from_storage_params(data_operator.spill_params()); + let operator = data_operator.spill_operator(); while let Some(data) = self.steal_restore_build_task() { let buffer_pool = SpillsBufferPool::instance(); - let mut reader = buffer_pool.reader(operator.clone(), data.path, data.row_groups)?; + let mut reader = + buffer_pool.reader(operator.clone(), data.path, data.row_groups, target)?; while let Some(data_block) = reader.read(self.read_settings)? { self.memory_hash_join.add_block(Some(data_block))?; @@ -384,11 +388,12 @@ pub struct GraceJoinPartition { impl GraceJoinPartition { pub fn create(prefix: &str) -> Result { let data_operator = DataOperator::instance(); + let target = SpillTarget::from_storage_params(data_operator.spill_params()); let operator = data_operator.spill_operator(); let buffer_pool = SpillsBufferPool::instance(); let file_path = format!("{}/{}", prefix, GlobalUniqName::unique()); - let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?; + let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?; Ok(GraceJoinPartition { path: file_path, @@ -456,9 +461,12 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> { continue; } - let operator = DataOperator::instance().spill_operator(); + let data_operator = DataOperator::instance(); + let target = SpillTarget::from_storage_params(data_operator.spill_params()); + let operator = data_operator.spill_operator(); let buffer_pool = SpillsBufferPool::instance(); - let reader = buffer_pool.reader(operator, data.path, data.row_groups)?; + let reader = + buffer_pool.reader(operator, data.path, data.row_groups, target)?; self.spills_reader = Some(reader); break; } diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs index 4a3268bc65c8e..eba1d66abb0e3 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs @@ -59,7 +59,7 @@ impl Writer for SpillWriter { fn need_new_file(&mut self, incoming_size: usize) -> Result { Ok(match self.file_writer() { - AnyFileWriter::Local(file_writer) => !file_writer.check_grow(incoming_size, true)?, + AnyFileWriter::Local { writer, .. } => !writer.check_grow(incoming_size, true)?, _ => false, }) } diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index fd0b1d3b13b0f..44fed5883b7fd 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -19,6 +19,7 @@ use std::time::SystemTime; use databend_common_base::base::ProgressValues; use databend_common_base::base::SpillProgress; use databend_common_base::runtime::CatchUnwindFuture; +use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; @@ -530,7 +531,9 @@ impl ExecuteState { let spiller = if settings.get_enable_result_set_spilling()? { let temp_dir_manager = TempDirManager::instance(); - let disk_bytes_limit = settings.get_result_set_spilling_to_disk_bytes_limit()?; + let disk_bytes_limit = GlobalConfig::instance() + .spill + .result_set_spill_bytes_limit(); let enable_dio = settings.get_enable_dio()?; let disk_spill = temp_dir_manager .get_disk_spill_dir(disk_bytes_limit, &ctx.get_id()) diff --git a/src/query/service/src/spillers/adapter.rs b/src/query/service/src/spillers/adapter.rs index 83cb97a22a2d9..e52d47da6d69e 100644 --- a/src/query/service/src/spillers/adapter.rs +++ b/src/query/service/src/spillers/adapter.rs @@ -36,6 +36,7 @@ use opendal::Buffer; use opendal::Operator; use parquet::file::metadata::RowGroupMetaDataPtr; +use super::async_buffer::SpillTarget; use super::block_reader::BlocksReader; use super::block_writer::BlocksWriter; use super::inner::*; @@ -61,9 +62,9 @@ impl SpillAdapter for PartitionAdapter { .unwrap() .insert(location.clone(), layout.clone()); - if location.is_remote() { - self.ctx.as_ref().incr_spill_progress(1, size); - } + // Progress (SpillTotalStats) should reflect total spill volume, + // including both local and remote spill files. + self.ctx.as_ref().incr_spill_progress(1, size); self.ctx.as_ref().add_spill_file(location, layout); } @@ -232,7 +233,7 @@ impl Spiller { let instant = Instant::now(); let location = self.write_encodes(write_bytes, buf).await?; // Record statistics. - record_write_profile(location.is_local(), &instant, write_bytes); + record_write_profile(&location, &instant, write_bytes); self.adapter .add_spill_file(location.clone(), layout, write_bytes); @@ -276,7 +277,7 @@ impl Spiller { }; // Record statistics. - record_read_profile(location.is_local(), &instant, data.len()); + record_read_profile(location, &instant, data.len()); // Deserialize partitioned data block. let mut partitioned_data = Vec::with_capacity(partitions.len()); @@ -310,7 +311,7 @@ impl Spiller { Location::Remote(loc) => self.operator.read_with(loc).range(data_range).await?, }; - record_read_profile(location.is_local(), &instant, data.len()); + record_read_profile(location, &instant, data.len()); deserialize_block(layout, data) } @@ -485,18 +486,21 @@ impl SpillWriter { let start = std::time::Instant::now(); match &mut self.file_writer { - AnyFileWriter::Local(file_writer) => { - let row_group_meta = file_writer.flush_row_group(row_group)?; + AnyFileWriter::Local { writer, .. } => { + let row_group_meta = writer.flush_row_group(row_group)?; let size = row_group_meta.compressed_size() as _; self.spiller.adapter.update_progress(0, size); - record_write_profile(true, &start, size); + record_write_profile(SpillTarget::Local, &start, size); Ok(row_group_meta) } - AnyFileWriter::Remote(_, file_writer) => { - let row_group_meta = file_writer.flush_row_group(row_group)?; + AnyFileWriter::Remote { + path: _path, + writer, + } => { + let row_group_meta = writer.flush_row_group(row_group)?; let size = row_group_meta.compressed_size() as _; self.spiller.adapter.update_progress(0, size); - record_write_profile(false, &start, size); + record_write_profile(SpillTarget::Remote, &start, size); Ok(row_group_meta) } } @@ -508,15 +512,16 @@ impl SpillWriter { pub fn close(self) -> Result { let (metadata, location) = match self.file_writer { - AnyFileWriter::Local(file_writer) => { - let (metadata, path) = file_writer.finish()?; + AnyFileWriter::Local { writer, .. } => { + let (metadata, path) = writer.finish()?; + let location = Location::Local(path); self.spiller .adapter - .add_spill_file(Location::Local(path.clone()), Layout::Parquet); - (metadata, Location::Local(path)) + .add_spill_file(location.clone(), Layout::Parquet); + (metadata, location) } - AnyFileWriter::Remote(path, file_writer) => { - let (metadata, _) = file_writer.finish()?; + AnyFileWriter::Remote { path, writer } => { + let (metadata, _) = writer.finish()?; let location = Location::Remote(path); self.spiller @@ -563,7 +568,7 @@ impl SpillReader { )?; record_read_profile( - self.location.is_local(), + &self.location, &start, blocks.iter().map(DataBlock::memory_size).sum(), ); @@ -574,9 +579,8 @@ impl SpillReader { impl SpillAdapter for Arc { fn add_spill_file(&self, location: Location, layout: Layout, size: usize) { - if matches!(location, Location::Remote(_)) { - self.incr_spill_progress(1, size); - } + // Count both local and remote spills in SpillTotalStats. + self.incr_spill_progress(1, size); self.as_ref().add_spill_file(location, layout); } @@ -594,10 +598,16 @@ impl SpillAdapter for SortAdapter { fn add_spill_file(&self, location: Location, layout: Layout, size: usize) { match location { Location::Remote(_) => { + // Remote spill files are tracked in QueryContext for cleanup + // and contribute to the total spill progress. self.ctx.as_ref().incr_spill_progress(1, size); self.ctx.as_ref().add_spill_file(location, layout); } Location::Local(temp_path) => { + // Local spill files are tracked only in-memory for sort, but + // should still be counted in SpillTotalStats so that progress + // reflects total (local + remote) spilled bytes/files. + self.ctx.as_ref().incr_spill_progress(1, size); self.local_files.write().unwrap().insert(temp_path, layout); } } diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index 7b40b45b9f9da..976ba679b0c53 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use std::sync::Condvar; use std::sync::Mutex; use std::sync::PoisonError; +use std::time::Instant; use arrow_schema::Schema; use bytes::Bytes; @@ -36,6 +37,7 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; use databend_common_expression::TableSchemaRef; +use databend_common_meta_app::storage::StorageParams; use databend_common_storages_parquet::parquet_reader::row_group::get_ranges; use databend_common_storages_parquet::parquet_reader::RowGroupCore; use databend_common_storages_parquet::ReadSettings; @@ -56,8 +58,35 @@ use parquet::file::metadata::RowGroupMetaData; use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; +use super::record_read_profile; +use super::record_write_profile; const CHUNK_SIZE: usize = 4 * 1024 * 1024; +#[derive(Clone, Copy)] +pub enum SpillTarget { + Local, + Remote, +} + +impl SpillTarget { + pub fn is_local(&self) -> bool { + matches!(self, SpillTarget::Local) + } + + /// Derive spill target (local vs remote) from storage params. + /// + /// Today we only treat `StorageParams::Fs` as local, everything + /// else (S3, Azure, memory, etc.) is considered remote. Centralizing + /// this decision here keeps higher-level operators simpler and avoids + /// duplicating the matching logic at each call site. + pub fn from_storage_params(params: Option<&StorageParams>) -> Self { + match params { + Some(StorageParams::Fs(_)) => SpillTarget::Local, + _ => SpillTarget::Remote, + } + } +} + /// Buffer Pool Workflow for Spill Operations: /// /// Context: During query execution, when memory pressure is high, intermediate @@ -182,12 +211,21 @@ impl SpillsBufferPool { .expect("Buffer pool working queue need unbounded."); } - pub fn buffer_write(self: &Arc, writer: Writer) -> BufferWriter { - BufferWriter::new(writer, self.clone()) + pub fn buffer_write( + self: &Arc, + writer: Writer, + target: SpillTarget, + ) -> BufferWriter { + BufferWriter::new(writer, self.clone(), target) } - pub fn writer(self: &Arc, op: Operator, path: String) -> Result { - let writer = self.buffer_writer(op, path)?; + pub fn writer( + self: &Arc, + op: Operator, + path: String, + target: SpillTarget, + ) -> Result { + let writer = self.buffer_writer(op, path, target)?; Ok(SpillsDataWriter::Uninitialize(Some(writer))) } @@ -195,6 +233,7 @@ impl SpillsBufferPool { self: &Arc, op: Operator, path: String, + target: SpillTarget, ) -> Result { let pending_response = BufferOperatorResp::pending(); @@ -209,7 +248,7 @@ impl SpillsBufferPool { .try_send(operator) .expect("Buffer pool working queue need unbounded."); - Ok(self.buffer_write(pending_response.wait_and_take()?)) + Ok(self.buffer_write(pending_response.wait_and_take()?, target)) } pub fn reader( @@ -217,8 +256,9 @@ impl SpillsBufferPool { op: Operator, path: String, row_groups: Vec, + target: SpillTarget, ) -> Result { - SpillsDataReader::create(path, op, row_groups, self.clone()) + SpillsDataReader::create(path, op, row_groups, self.clone(), target) } pub fn fetch_ranges( @@ -257,16 +297,22 @@ pub struct BufferWriter { buffer_pool: Arc, pending_buffers: VecDeque, pending_response: Option>>, + target: SpillTarget, } impl BufferWriter { - pub fn new(writer: Writer, buffer_pool: Arc) -> BufferWriter { + pub fn new( + writer: Writer, + buffer_pool: Arc, + target: SpillTarget, + ) -> BufferWriter { BufferWriter { buffer_pool, writer: Some(writer), current_bytes: None, pending_buffers: VecDeque::new(), pending_response: None, + target, } } @@ -307,6 +353,8 @@ impl BufferWriter { writer, response: pending_response, buffers: std::mem::take(&mut self.pending_buffers), + target: self.target, + start: Instant::now(), }); self.buffer_pool.operator(operator); @@ -345,6 +393,7 @@ impl BufferWriter { buffer_pool: self.buffer_pool.clone(), pending_buffers: Default::default(), pending_response: None, + target: self.target, }) .close() } @@ -534,6 +583,7 @@ pub struct SpillsDataReader { data_schema: DataSchemaRef, field_levels: FieldLevels, read_bytes: usize, + target: SpillTarget, } impl SpillsDataReader { @@ -542,6 +592,7 @@ impl SpillsDataReader { operator: Operator, row_groups: Vec, spills_buffer_pool: Arc, + target: SpillTarget, ) -> Result { if row_groups.is_empty() { return Err(ErrorCode::Internal( @@ -566,6 +617,7 @@ impl SpillsDataReader { field_levels, row_groups: VecDeque::from(row_groups), read_bytes: 0, + target, }) } @@ -578,6 +630,8 @@ impl SpillsDataReader { return Ok(None); }; + let start = Instant::now(); + let mut row_group = RowGroupCore::new(row_group, None); let read_bytes = Cell::new(0usize); @@ -606,6 +660,7 @@ impl SpillsDataReader { )?; let batch = reader.next().transpose()?.unwrap(); debug_assert!(reader.next().is_none()); + record_read_profile(self.target, &start, read_bytes.get()); Ok(Some(DataBlock::from_record_batch( &self.data_schema, &batch, @@ -623,6 +678,8 @@ pub struct BufferWriteOperator { writer: Writer, buffers: VecDeque, response: Arc>, + target: SpillTarget, + start: Instant, } pub struct BufferCloseResp { @@ -714,7 +771,9 @@ impl Background { pub async fn recv(&mut self, op: BufferOperator) { match op { BufferOperator::Write(mut op) => { + let start = op.start; let bytes = op.buffers.clone(); + let bytes_len = bytes.iter().map(|b| b.len()).sum(); let mut error = op .writer .write(op.buffers) @@ -745,6 +804,8 @@ impl Background { error, writer: op.writer, }); + + record_write_profile(op.target, &start, bytes_len); } BufferOperator::Close(mut op) => { let res = op.writer.close().await; @@ -811,7 +872,7 @@ mod tests { let operator = create_test_operator().unwrap(); let writer = operator.writer("test_file").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool); + let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); let data = b"Hello, World!"; let written = buffer_writer.write(data).unwrap(); @@ -829,7 +890,7 @@ mod tests { let operator = create_test_operator().unwrap(); let writer = operator.writer("large_file").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool); + let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); // Write data larger than single buffer let large_data = vec![0u8; 8 * 1024 * 1024]; // 8MB @@ -848,7 +909,7 @@ mod tests { let operator = create_test_operator().unwrap(); let writer = operator.writer("multi_write_file").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool); + let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); let mut total_written = 0; for i in 0..100 { @@ -870,7 +931,7 @@ mod tests { let operator = create_test_operator().unwrap(); let writer = operator.writer("backpressure_test").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool.clone()); + let mut buffer_writer = BufferWriter::new(writer, pool.clone(), SpillTarget::Remote); // Fill the first buffer let data = vec![0u8; CHUNK_SIZE]; @@ -923,7 +984,7 @@ mod tests { let operator = create_test_operator().unwrap(); let writer = operator.writer("empty_test").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool); + let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); let written = buffer_writer.write(b"").unwrap(); assert_eq!(written, 0); @@ -940,7 +1001,7 @@ mod tests { let operator = create_test_operator().unwrap(); let writer = operator.writer("no_write_test").await.unwrap(); - let buffer_writer = BufferWriter::new(writer, pool); + let buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); // Should be able to close without any writes let metadata = buffer_writer.close().unwrap(); @@ -966,7 +1027,7 @@ mod tests { .writer(&format!("concurrent_{}", i)) .await .unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool_clone); + let mut buffer_writer = BufferWriter::new(writer, pool_clone, SpillTarget::Remote); for j in 0..10 { let data = format!("Writer {} - Line {}\n", i, j); @@ -996,7 +1057,7 @@ mod tests { let operator = create_test_operator().unwrap(); let writer = operator.writer("error_test").await.unwrap(); - let mut buffer_writer = pool.buffer_write(writer); + let mut buffer_writer = pool.buffer_write(writer, SpillTarget::Remote); buffer_writer.write_all(b"test data").unwrap(); // Close once @@ -1004,12 +1065,12 @@ mod tests { // Create another writer and try to close it twice let writer2 = operator.writer("error_test2").await.unwrap(); - let buffer_writer2 = pool.buffer_write(writer2); + let buffer_writer2 = pool.buffer_write(writer2, SpillTarget::Remote); let _metadata = buffer_writer2.close().unwrap(); // Second close should return error (create new writer for this test) let writer3 = operator.writer("error_test3").await.unwrap(); - let buffer_writer3 = pool.buffer_write(writer3); + let buffer_writer3 = pool.buffer_write(writer3, SpillTarget::Remote); let _metadata = buffer_writer3.close().unwrap(); } } diff --git a/src/query/service/src/spillers/inner.rs b/src/query/service/src/spillers/inner.rs index 4fd22b727be09..9b4fe414424e7 100644 --- a/src/query/service/src/spillers/inner.rs +++ b/src/query/service/src/spillers/inner.rs @@ -31,6 +31,7 @@ use opendal::services::Fs; use opendal::Buffer; use opendal::Operator; +use super::async_buffer::SpillTarget; use super::serialize::*; use super::Location; @@ -159,7 +160,7 @@ impl SpillerInner { let location = self.write_encodes(data_size, buf).await?; // Record statistics. - record_write_profile(location.is_local(), &instant, data_size); + record_write_profile(&location, &instant, data_size); let layout = columns_layout.pop().unwrap(); Ok((location, layout, data_size)) } @@ -201,7 +202,7 @@ impl SpillerInner { Location::Remote(loc) => self.operator.read(loc).await?, }; - record_read_profile(location.is_local(), &instant, data.len()); + record_read_profile(location, &instant, data.len()); deserialize_block(columns_layout, data) } @@ -275,38 +276,75 @@ impl SpillerInner { } } -pub(super) fn record_write_profile(is_local: bool, start: &Instant, write_bytes: usize) { - if !is_local { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteBytes, write_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillWriteTime, - start.elapsed().as_millis() as usize, - ); - } else { - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteBytes, write_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::LocalSpillWriteTime, - start.elapsed().as_millis() as usize, - ); +impl From<&Location> for SpillTarget { + fn from(value: &Location) -> Self { + if value.is_local() { + SpillTarget::Local + } else { + SpillTarget::Remote + } } } -pub(super) fn record_read_profile(is_local: bool, start: &Instant, read_bytes: usize) { - if is_local { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillReadTime, - start.elapsed().as_millis() as usize, - ); - } else { - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::LocalSpillReadTime, - start.elapsed().as_millis() as usize, - ); +impl From for SpillTarget { + fn from(value: Location) -> Self { + (&value).into() } } + +fn record_spill_profile( + locality: SpillTarget, + start: &Instant, + bytes: usize, + local_count: ProfileStatisticsName, + local_bytes: ProfileStatisticsName, + local_time: ProfileStatisticsName, + remote_count: ProfileStatisticsName, + remote_bytes: ProfileStatisticsName, + remote_time: ProfileStatisticsName, +) { + match locality { + SpillTarget::Local => { + Profile::record_usize_profile(local_count, 1); + Profile::record_usize_profile(local_bytes, bytes); + Profile::record_usize_profile(local_time, start.elapsed().as_millis() as usize); + } + SpillTarget::Remote => { + Profile::record_usize_profile(remote_count, 1); + Profile::record_usize_profile(remote_bytes, bytes); + Profile::record_usize_profile(remote_time, start.elapsed().as_millis() as usize); + } + } +} + +pub fn record_write_profile>( + locality: T, + start: &Instant, + write_bytes: usize, +) { + record_spill_profile( + locality.into(), + start, + write_bytes, + ProfileStatisticsName::LocalSpillWriteCount, + ProfileStatisticsName::LocalSpillWriteBytes, + ProfileStatisticsName::LocalSpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteCount, + ProfileStatisticsName::RemoteSpillWriteBytes, + ProfileStatisticsName::RemoteSpillWriteTime, + ); +} + +pub fn record_read_profile>(locality: T, start: &Instant, read_bytes: usize) { + record_spill_profile( + locality.into(), + start, + read_bytes, + ProfileStatisticsName::LocalSpillReadCount, + ProfileStatisticsName::LocalSpillReadBytes, + ProfileStatisticsName::LocalSpillReadTime, + ProfileStatisticsName::RemoteSpillReadCount, + ProfileStatisticsName::RemoteSpillReadBytes, + ProfileStatisticsName::RemoteSpillReadTime, + ); +} diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index 0aec89075e6eb..35071c2d6e14c 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -25,6 +25,7 @@ mod test_memory; pub use adapter::*; pub use async_buffer::BufferWriter; +pub use async_buffer::SpillTarget; pub use async_buffer::SpillsBufferPool; pub use async_buffer::SpillsDataReader; pub use async_buffer::SpillsDataWriter; diff --git a/src/query/service/src/spillers/row_group_encoder.rs b/src/query/service/src/spillers/row_group_encoder.rs index 01ac5169ccc71..f832c69e5865d 100644 --- a/src/query/service/src/spillers/row_group_encoder.rs +++ b/src/query/service/src/spillers/row_group_encoder.rs @@ -54,6 +54,7 @@ use parquet::file::writer::SerializedRowGroupWriter; use parquet::schema::types::SchemaDescriptor; use super::async_buffer::BufferWriter; +use super::async_buffer::SpillTarget; use super::Location; use super::SpillerInner; use super::SpillsBufferPool; @@ -411,15 +412,23 @@ impl RangeFetchPlan { } pub enum AnyFileWriter { - Local(FileWriter), - Remote(String, FileWriter), + Local { + /// Absolute path string for symmetry with remote; TempPath is kept + /// inside the writer to avoid sharing/cloning while writing. + path: String, + writer: FileWriter, + }, + Remote { + path: String, + writer: FileWriter, + }, } impl AnyFileWriter { pub(super) fn new_row_group(&self) -> RowGroupEncoder { match self { - AnyFileWriter::Local(file_writer) => file_writer.new_row_group(), - AnyFileWriter::Remote(_, file_writer) => file_writer.new_row_group(), + AnyFileWriter::Local { writer, .. } => writer.new_row_group(), + AnyFileWriter::Remote { writer, .. } => writer.new_row_group(), } } } @@ -441,23 +450,30 @@ impl SpillerInner { let align = dir.block_alignment(); let buf = DmaWriteBuf::new(align, chunk); + let path_str = path.as_ref().display().to_string(); + let w = LocalWriter { dir: dir.clone(), path, file, buf, }; - return Ok(AnyFileWriter::Local(FileWriter::new(props, w)?)); + let writer = FileWriter::new(props, w)?; + return Ok(AnyFileWriter::Local { + path: path_str, + writer, + }); } }; let remote_location = self.create_unique_location(); - let remote = pool.buffer_writer(op.clone(), remote_location.clone())?; + let remote = + pool.buffer_writer(op.clone(), remote_location.clone(), SpillTarget::Remote)?; - Ok(AnyFileWriter::Remote( - remote_location, - FileWriter::new(props, remote)?, - )) + Ok(AnyFileWriter::Remote { + path: remote_location.clone(), + writer: FileWriter::new(props, remote)?, + }) } pub(super) fn load_row_groups( diff --git a/src/query/service/src/test_kits/config.rs b/src/query/service/src/test_kits/config.rs index d24472bd84c81..ad4e1a59bed27 100644 --- a/src/query/service/src/test_kits/config.rs +++ b/src/query/service/src/test_kits/config.rs @@ -15,6 +15,7 @@ use databend_common_base::base::GlobalUniqName; use databend_common_config::BuiltInConfig; use databend_common_config::InnerConfig; +use databend_common_config::SpillConfig; use databend_common_config::UDFConfig; use databend_common_config::UserAuthConfig; use databend_common_config::UserConfig; @@ -59,7 +60,7 @@ ADDRESS = 'https://databend.com';" // set node_id to a unique value conf.query.node_id = GlobalUniqName::unique(); - // set storage to fs + // set storage to fs for tests; individual tests may override this let tmp_dir = TempDir::new().expect("create tmp dir failed"); let root = tmp_dir.path().to_str().unwrap().to_string(); conf.storage.params = StorageParams::Fs(StorageFsConfig { root }); @@ -184,3 +185,20 @@ ADDRESS = 'https://databend.com';" self.conf.clone() } } + +/// Helper for spill-related tests: start from the default test config and +/// attach a local spill configuration so that TempDirManager has a root path. +pub fn config_with_spill() -> InnerConfig { + let mut conf = ConfigBuilder::create().config(); + + // Use a stable directory under the workspace for spill during tests. + // TempDirManager::init will create and clean up directories under this + // path, so it's safe to reuse across tests. + conf.spill = SpillConfig::new_for_test( + ".databend/_query_spill".to_string(), + 0.0, + 1024 * 1024 * 1024, + ); + + conf +} diff --git a/src/query/service/src/test_kits/mod.rs b/src/query/service/src/test_kits/mod.rs index e5a1f7102d2f3..c9fe82f456992 100644 --- a/src/query/service/src/test_kits/mod.rs +++ b/src/query/service/src/test_kits/mod.rs @@ -26,6 +26,7 @@ mod old_version_generator; pub use block_writer::BlockWriter; pub use check::*; pub use cluster::ClusterDescriptor; +pub use config::config_with_spill; pub use config::ConfigBuilder; pub use context::*; pub use fixture::*; diff --git a/src/query/service/tests/it/configs.rs b/src/query/service/tests/it/configs.rs index 7748efe569796..fc9dfbe068058 100644 --- a/src/query/service/tests/it/configs.rs +++ b/src/query/service/tests/it/configs.rs @@ -948,7 +948,7 @@ spill_local_disk_path = "/data/spill" let cfg = InnerConfig::load_for_test().expect("config load failed"); assert_eq!(cfg.spill.local_path(), Some("/data/spill".into())); - assert_eq!(cfg.spill.reserved_disk_ratio, 0.3); + assert_eq!(cfg.spill.reserved_disk_ratio, 0.1); }, ); @@ -975,7 +975,8 @@ fn test_spill_config_comprehensive() -> Result<()> { vec![("CONFIG_FILE", Some(file_path.to_string_lossy().as_ref()))], || { let cfg = InnerConfig::load_for_test().expect("config load failed"); - // Default behavior: no explicit local path, falls back to cache + // Default behavior: no explicit local path, local spill is disabled + // unless [spill] is explicitly configured. assert_eq!(cfg.spill.local_path(), None); }, ); @@ -1065,7 +1066,7 @@ secret_access_key = "test-secret" r#"[spill] spill_local_disk_path = "/legacy/spill/path" spill_local_disk_reserved_space_percentage = 25.0 -spill_local_disk_max_bytes = 53687091200 +spill_local_disk_max_bytes = 1073741824 "# .as_bytes(), )?; @@ -1077,7 +1078,7 @@ spill_local_disk_max_bytes = 53687091200 let cfg = InnerConfig::load_for_test().expect("config load failed"); assert_eq!(cfg.spill.local_path(), Some("/legacy/spill/path".into())); assert_eq!(cfg.spill.reserved_disk_ratio.into_inner(), 0.25); - assert_eq!(cfg.spill.global_bytes_limit, 53687091200); + assert_eq!(cfg.spill.global_bytes_limit, 1073741824); }, ); fs::remove_file(file_path)?; diff --git a/src/query/service/tests/it/spillers/mod.rs b/src/query/service/tests/it/spillers/mod.rs index c2bfcab0d3622..039800621c60a 100644 --- a/src/query/service/tests/it/spillers/mod.rs +++ b/src/query/service/tests/it/spillers/mod.rs @@ -13,4 +13,7 @@ // limitations under the License. mod spill_config_behavior; +mod spill_fallback; +mod spill_profile; +mod spill_target; mod spiller; diff --git a/src/query/service/tests/it/spillers/spill_config_behavior.rs b/src/query/service/tests/it/spillers/spill_config_behavior.rs index 179222bc625cd..f4c4b13407b6c 100644 --- a/src/query/service/tests/it/spillers/spill_config_behavior.rs +++ b/src/query/service/tests/it/spillers/spill_config_behavior.rs @@ -30,7 +30,7 @@ use databend_query::spillers::Location; use databend_query::spillers::Spiller; use databend_query::spillers::SpillerConfig; use databend_query::spillers::SpillerType; -use databend_query::test_kits::ConfigBuilder; +use databend_query::test_kits::config_with_spill; use databend_query::test_kits::TestFixture; use databend_storages_common_cache::TempDirManager; @@ -39,8 +39,8 @@ use databend_storages_common_cache::TempDirManager; async fn test_spill_directory_operations() -> Result<()> { println!("🔄 Starting spill directory operations test"); - // Setup with default config - let config = ConfigBuilder::create().build(); + // Setup with spill-enabled config dedicated for spill tests + let config = config_with_spill(); let fixture = TestFixture::setup_with_config(&config).await?; let ctx = fixture.new_query_ctx().await?; @@ -102,7 +102,7 @@ async fn test_spill_directory_operations() -> Result<()> { /// Test 2: Spill size limit enforcement #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_spill_size_limits() -> Result<()> { - let config = ConfigBuilder::create().build(); + let config = config_with_spill(); let fixture = TestFixture::setup_with_config(&config).await?; let ctx = fixture.new_query_ctx().await?; @@ -144,7 +144,7 @@ async fn test_spill_size_limits() -> Result<()> { /// Test 3: Verify spill data integrity and actual file operations #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_spill_data_integrity() -> Result<()> { - let config = ConfigBuilder::create().build(); + let config = config_with_spill(); let fixture = TestFixture::setup_with_config(&config).await?; let ctx = fixture.new_query_ctx().await?; diff --git a/src/query/service/tests/it/spillers/spill_fallback.rs b/src/query/service/tests/it/spillers/spill_fallback.rs new file mode 100644 index 0000000000000..c6a7a34da9bdc --- /dev/null +++ b/src/query/service/tests/it/spillers/spill_fallback.rs @@ -0,0 +1,117 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_base::base::tokio; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::types::Int32Type; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_storage::DataOperator; +use databend_query::spillers::Location; +use databend_query::spillers::Spiller; +use databend_query::spillers::SpillerConfig; +use databend_query::spillers::SpillerDiskConfig; +use databend_query::spillers::SpillerType; +use databend_query::test_kits::config_with_spill; +use databend_query::test_kits::TestFixture; +use databend_storages_common_cache::TempDirManager; + +/// ASCII flow of the test (data view): +/// +/// Int32 rows +/// | +/// v +/// DataBlock (rows_per_block = 32 * 1024) +/// | +/// v +/// Spiller::spill(vec![block.clone()]) -- repeated writes +/// | +/// v +/// Local spill files on disk (quota: 32 * 1024 bytes) +/// | +/// | accumulate used_local_bytes from each Local(path) +/// v +/// when used_local_bytes > quota +/// | +/// v +/// next spill -> Location::Remote(...) +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_spill_fallback_to_remote_when_local_full() -> Result<()> { + let config = config_with_spill(); + let fixture = TestFixture::setup_with_config(&config).await?; + let ctx = fixture.new_query_ctx().await?; + + // Prepare a small local spill quota to force fallback after a few writes. + let limit = 32 * 1024; // 32KB + let temp_manager = TempDirManager::instance(); + let temp_dir = temp_manager + .get_disk_spill_dir(limit, &ctx.get_id()) + .expect("local spill should be available"); + let disk_spill = SpillerDiskConfig::new(temp_dir, false)?; + + let spiller_config = SpillerConfig { + spiller_type: SpillerType::Aggregation, + location_prefix: ctx.query_id_spill_prefix(), + disk_spill: Some(disk_spill), + use_parquet: false, + }; + + let operator = DataOperator::instance().spill_operator(); + let spiller = Spiller::create(ctx.clone(), operator, spiller_config)?; + + // Use a stable block size that fits into the tiny quota but will exhaust it after a few rounds. + let rows_per_block = 32 * 1024; + let block = DataBlock::new_from_columns(vec![Int32Type::from_data(vec![1i32; rows_per_block])]); + + let loc1 = spiller.spill(vec![block.clone()]).await?; + let first_block_bytes = match &loc1 { + Location::Local(path) => path.size(), + Location::Remote(_) => { + panic!("first spill should land on local because quota is available") + } + }; + let mut locations = vec![loc1]; + + let mut used_local_bytes = first_block_bytes; + let mut saw_remote = false; + // Cap the attempts to ensure the test finishes quickly even with tiny blocks. + let max_attempts = (limit / first_block_bytes.max(1)).saturating_add(8); + for _ in 0..max_attempts { + let loc = spiller.spill(vec![block.clone()]).await?; + match &loc { + Location::Local(path) => { + used_local_bytes += path.size(); + } + Location::Remote(_) => { + saw_remote = true; + break; + } + } + locations.push(loc); + } + assert!( + saw_remote, + "should fallback to remote when local quota is exhausted (used_local_bytes={}, limit={}, first_block_bytes={}, attempts={})", + used_local_bytes, + limit, + first_block_bytes, + max_attempts + ); + + // Cleanup the temp directory for this query. + let _ = temp_manager.drop_disk_spill_dir(&ctx.get_id()); + + Ok(()) +} diff --git a/src/query/service/tests/it/spillers/spill_profile.rs b/src/query/service/tests/it/spillers/spill_profile.rs new file mode 100644 index 0000000000000..d097e7f273908 --- /dev/null +++ b/src/query/service/tests/it/spillers/spill_profile.rs @@ -0,0 +1,128 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Spill profile related tests: verify spill read/write metrics wiring to Profile. + +use std::sync::Arc; +use std::time::Instant; + +use databend_common_base::runtime::profile::Profile; +use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::ThreadTracker; +use databend_query::spillers::record_read_profile; +use databend_query::spillers::record_write_profile; +use databend_query::spillers::SpillTarget; + +fn create_test_profile() -> Arc { + Arc::new(Profile::create( + 0, + "test_spill_profile".to_string(), + None, + None, + None, + Arc::new(String::new()), + Arc::new(vec![]), + None, + )) +} + +#[test] +fn test_spill_profile_write_local_and_remote() { + ThreadTracker::init(); + + // Local spill metrics + let mut payload_local = ThreadTracker::new_tracking_payload(); + let local_profile = create_test_profile(); + payload_local.profile = Some(local_profile.clone()); + + { + let _guard = ThreadTracker::tracking(payload_local); + let start = Instant::now(); + record_write_profile(SpillTarget::Local, &start, 128); + } + + assert_eq!( + local_profile.load_profile(ProfileStatisticsName::LocalSpillWriteCount), + 1, + ); + assert_eq!( + local_profile.load_profile(ProfileStatisticsName::LocalSpillWriteBytes), + 128, + ); + + // Remote spill metrics + let mut payload_remote = ThreadTracker::new_tracking_payload(); + let remote_profile = create_test_profile(); + payload_remote.profile = Some(remote_profile.clone()); + + { + let _guard = ThreadTracker::tracking(payload_remote); + let start = Instant::now(); + record_write_profile(SpillTarget::Remote, &start, 256); + } + + assert_eq!( + remote_profile.load_profile(ProfileStatisticsName::RemoteSpillWriteCount), + 1, + ); + assert_eq!( + remote_profile.load_profile(ProfileStatisticsName::RemoteSpillWriteBytes), + 256, + ); +} + +#[test] +fn test_spill_profile_read_local_and_remote() { + ThreadTracker::init(); + + // Local spill metrics + let mut payload_local = ThreadTracker::new_tracking_payload(); + let local_profile = create_test_profile(); + payload_local.profile = Some(local_profile.clone()); + + { + let _guard = ThreadTracker::tracking(payload_local); + let start = Instant::now(); + record_read_profile(SpillTarget::Local, &start, 64); + } + + assert_eq!( + local_profile.load_profile(ProfileStatisticsName::LocalSpillReadCount), + 1, + ); + assert_eq!( + local_profile.load_profile(ProfileStatisticsName::LocalSpillReadBytes), + 64, + ); + + // Remote spill metrics + let mut payload_remote = ThreadTracker::new_tracking_payload(); + let remote_profile = create_test_profile(); + payload_remote.profile = Some(remote_profile.clone()); + + { + let _guard = ThreadTracker::tracking(payload_remote); + let start = Instant::now(); + record_read_profile(SpillTarget::Remote, &start, 512); + } + + assert_eq!( + remote_profile.load_profile(ProfileStatisticsName::RemoteSpillReadCount), + 1, + ); + assert_eq!( + remote_profile.load_profile(ProfileStatisticsName::RemoteSpillReadBytes), + 512, + ); +} diff --git a/src/query/service/tests/it/spillers/spill_target.rs b/src/query/service/tests/it/spillers/spill_target.rs new file mode 100644 index 0000000000000..5fb05d8262dd4 --- /dev/null +++ b/src/query/service/tests/it/spillers/spill_target.rs @@ -0,0 +1,33 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Integration test for SpillTarget::from_storage_params semantics. + +use databend_common_meta_app::storage::StorageFsConfig; +use databend_common_meta_app::storage::StorageParams; +use databend_query::spillers::SpillTarget; + +#[test] +fn test_spill_target_from_storage_params() { + // Fs backend should be treated as local spill. + let fs_params = StorageParams::Fs(StorageFsConfig { + root: "/tmp/test-root".to_string(), + }); + let target = SpillTarget::from_storage_params(Some(&fs_params)); + assert!(target.is_local()); + + // None (or any non-Fs backend) should be treated as remote spill. + let target_none = SpillTarget::from_storage_params(None); + assert!(!target_none.is_local()); +} diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 8f64dccea60b2..deca56b922ce1 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -224,10 +224,13 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'query' | 'udfs' | '{"name":"test_builtin_ping","definition":"CREATE OR REPLACE FUNCTION test_builtin_ping (STRING)\\n RETURNS STRING\\n LANGUAGE python\\nHANDLER = \'ping\'\\nADDRESS = \'https://databend.com\';"}' | '' | | 'query' | 'users' | '{"name":"root","auth_type":"no_password","auth_string":null}' | '' | | 'query' | 'warehouse_id' | 'test_warehouse' | '' | +| 'spill' | 'result_set_spilling_disk_quota_ratio' | '0' | '' | +| 'spill' | 'sort_spilling_disk_quota_ratio' | '60' | '' | | 'spill' | 'spill_local_disk_max_bytes' | '18446744073709551615' | '' | | 'spill' | 'spill_local_disk_path' | '' | '' | -| 'spill' | 'spill_local_disk_reserved_space_percentage' | '30.0' | '' | +| 'spill' | 'spill_local_disk_reserved_space_percentage' | '10.0' | '' | | 'spill' | 'storage' | 'null' | '' | +| 'spill' | 'window_partition_spilling_disk_quota_ratio' | '60' | '' | | 'storage' | 'allow_insecure' | 'true' | '' | | 'storage' | 'azblob.account_key' | '' | '' | | 'storage' | 'azblob.account_name' | '' | '' | diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index aeaece5d38c0a..aa38a68805fdd 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -671,13 +671,6 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=100)), }), - ("window_partition_spilling_to_disk_bytes_limit", DefaultSettingValue { - value: UserSettingValue::UInt64(0), - desc: "Sets the maximum amount of local disk in bytes that each window partitioner can use before spilling data to storage during query execution.", - mode: SettingMode::Both, - scope: SettingScope::Both, - range: Some(SettingRange::Numeric(0..=u64::MAX)), - }), ("window_num_partitions", DefaultSettingValue { value: UserSettingValue::UInt64(256), desc: "Sets the number of partitions for window operator.", @@ -706,20 +699,6 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=100)), }), - ("sort_spilling_to_disk_bytes_limit", DefaultSettingValue { - value: UserSettingValue::UInt64(0), - desc: "Sets the maximum amount of local disk in bytes that sorter can use before spilling data to storage during one query execution.", - mode: SettingMode::Both, - scope: SettingScope::Both, - range: Some(SettingRange::Numeric(0..=u64::MAX)), - }), - ("result_set_spilling_to_disk_bytes_limit", DefaultSettingValue { - value: UserSettingValue::UInt64(0), - desc: "Sets the maximum amount of local disk in bytes that result set can use before spilling data to storage during one query execution.", - mode: SettingMode::Both, - scope: SettingScope::Both, - range: Some(SettingRange::Numeric(0..=u64::MAX)), - }), ("enable_result_set_spilling", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Enable spilling result set data to storage when memory usage exceeds the threshold.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 6057058270e25..2cf2d7453ff32 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -497,10 +497,6 @@ impl Settings { Ok(self.try_get_u64("aggregate_spilling_memory_ratio")? as usize) } - pub fn get_window_partition_spilling_to_disk_bytes_limit(&self) -> Result { - Ok(self.try_get_u64("window_partition_spilling_to_disk_bytes_limit")? as usize) - } - pub fn get_window_partition_spilling_memory_ratio(&self) -> Result { Ok(self.try_get_u64("window_partition_spilling_memory_ratio")? as usize) } @@ -525,14 +521,6 @@ impl Settings { Ok(self.try_get_u64("sort_spilling_memory_ratio")? as usize) } - pub fn get_sort_spilling_to_disk_bytes_limit(&self) -> Result { - Ok(self.try_get_u64("sort_spilling_to_disk_bytes_limit")? as usize) - } - - pub fn get_result_set_spilling_to_disk_bytes_limit(&self) -> Result { - Ok(self.try_get_u64("result_set_spilling_to_disk_bytes_limit")? as usize) - } - pub fn get_enable_result_set_spilling(&self) -> Result { Ok(self.try_get_u64("enable_result_set_spilling")? == 1) } diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs index b86fe2411733f..384467d770814 100644 --- a/src/query/storages/common/cache/src/temp_dir.rs +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -137,6 +137,11 @@ impl TempDirManager { let mut group = self.group.lock().unwrap(); if group.dirs.remove(&path).is_some() { + log::debug!( + target: "spill-tempdir", + "[SPILL-TEMP] drop_disk_spill_dir removing path={:?}", + path + ); match fs::remove_dir_all(&path) { Ok(_) => return Ok(true), Err(e) if matches!(e.kind(), ErrorKind::NotFound) => {} @@ -171,6 +176,13 @@ impl TempDirManager { .take(limit) .collect::>(); drop(group); + if !to_delete.is_empty() { + log::debug!( + target: "spill-tempdir", + "[SPILL-TEMP] drop_disk_spill_dir_unknown removing={:?}", + to_delete + ); + } for path in &to_delete { fs::remove_dir_all(path)?; } diff --git a/tests/sqllogictests/suites/query/issues/issue_17581.test b/tests/sqllogictests/suites/query/issues/issue_17581.test index 90aee603129e4..6fb2a3cfaf33b 100644 --- a/tests/sqllogictests/suites/query/issues/issue_17581.test +++ b/tests/sqllogictests/suites/query/issues/issue_17581.test @@ -7,9 +7,6 @@ USE test_17581 statement ok SET force_window_data_spill = 1; -statement ok -set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; - statement ok SET force_sort_data_spill = 1; diff --git a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test index cd0613020b8ea..92c93ef474248 100644 --- a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test +++ b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test @@ -7,9 +7,6 @@ USE test_window_partition_spill statement ok SET force_window_data_spill = 1; -statement ok -set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; - statement ok set enable_dio = 1; @@ -38,6 +35,9 @@ FROM ( ---- 14999849576 +statement ok +unset enable_dio; + statement ok set enable_dio = 0; @@ -66,9 +66,6 @@ FROM ( ---- 14999849576 -statement ok -unset enable_dio; - statement ok DROP TABLE IF EXISTS customers; diff --git a/tests/sqllogictests/suites/tpcds/window_spill.test b/tests/sqllogictests/suites/tpcds/window_spill.test index e11627bd572bd..522ccda258b3f 100644 --- a/tests/sqllogictests/suites/tpcds/window_spill.test +++ b/tests/sqllogictests/suites/tpcds/window_spill.test @@ -8,9 +8,6 @@ use tpcds; statement ok set max_memory_usage = 1024*1024*200; -statement ok -set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; - statement ok set max_block_size = 2; @@ -30,6 +27,3 @@ unset max_block_size; statement ok UNSET max_memory_usage; - -statement ok -unset window_partition_spilling_to_disk_bytes_limit; diff --git a/tests/sqllogictests/suites/tpch/spill.test b/tests/sqllogictests/suites/tpch/spill.test index c6aa2cea290b0..c624d7918acae 100644 --- a/tests/sqllogictests/suites/tpch/spill.test +++ b/tests/sqllogictests/suites/tpch/spill.test @@ -20,9 +20,6 @@ SET force_aggregate_data_spill = 1; statement ok set join_spilling_buffer_threshold_per_proc_mb = 1; -statement ok -set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; - # TPC-H TEST include ./queries.test