From 040e8fab70eacf52f89d5c3517556e31390a1a43 Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 8 Oct 2024 23:21:59 +0800 Subject: [PATCH 1/3] feat(query): window partition by spill to disk (#16441) * dma write Signed-off-by: coldWater * partition spill to disk Signed-off-by: coldWater * fix Signed-off-by: coldWater * refine Signed-off-by: coldWater * temp file Signed-off-by: coldWater * refactor spill Signed-off-by: coldWater * clear temp dir Signed-off-by: coldWater * move dma Signed-off-by: coldWater * config Signed-off-by: coldWater * fix Signed-off-by: coldWater * bytes limit Signed-off-by: coldWater * rustix Signed-off-by: coldWater * rustix Signed-off-by: coldWater * temp Signed-off-by: coldWater * Location Signed-off-by: coldWater * builder Signed-off-by: coldWater * fix Signed-off-by: coldWater * SpillerConfig Signed-off-by: coldWater * temp dir Signed-off-by: coldWater * fix Signed-off-by: coldWater * drop_disk_spill_dir_unknown Signed-off-by: coldWater * drop residual temp dir Signed-off-by: coldWater * no copy read Signed-off-by: coldWater * rename reserved_disk_ratio Signed-off-by: coldWater * record the profile of LocalSpill and RemoteSpill separately Signed-off-by: coldWater * defalut disable Signed-off-by: coldWater * update setting Signed-off-by: coldWater * fix Signed-off-by: coldWater * fix dma Signed-off-by: coldWater * fix Signed-off-by: coldWater * refactor dma Signed-off-by: coldWater * change config Signed-off-by: coldWater * Alignment Signed-off-by: coldWater * cloud test Signed-off-by: coldWater * x Signed-off-by: coldWater * clean Signed-off-by: coldWater * spill_local_disk_path Signed-off-by: coldWater * fix Signed-off-by: coldWater * fix Signed-off-by: coldWater * fix Signed-off-by: coldWater --------- Signed-off-by: coldWater --- Cargo.lock | 74 +-- Cargo.toml | 1 + src/common/arrow/src/arrow/array/mod.rs | 2 +- src/common/base/Cargo.toml | 3 +- src/common/base/src/base/dma.rs | 495 ++++++++++++++++++ src/common/base/src/base/mod.rs | 5 + src/common/base/src/lib.rs | 1 + .../base/src/runtime/profile/profiles.rs | 113 ++-- src/common/tracing/Cargo.toml | 2 +- src/query/config/src/config.rs | 111 +++- src/query/config/src/inner.rs | 28 + src/query/config/src/lib.rs | 1 + src/query/config/src/mask.rs | 1 + src/query/service/src/global_services.rs | 2 + .../src/interpreters/hook/vacuum_hook.rs | 19 + .../service/src/interpreters/interpreter.rs | 11 +- .../src/pipelines/builders/builder_sort.rs | 15 +- .../src/pipelines/builders/builder_window.rs | 24 +- .../serde/transform_aggregate_spill_writer.rs | 18 +- ...transform_exchange_aggregate_serializer.rs | 18 +- .../transform_exchange_group_by_serializer.rs | 18 +- .../serde/transform_group_by_spill_writer.rs | 18 +- .../serde/transform_spill_reader.rs | 6 +- .../transforms/hash_join/hash_join_spiller.rs | 13 +- .../transforms/transform_sort_spill.rs | 18 +- .../transform_window_partition_collect.rs | 33 +- .../partition/window_partition_buffer.rs | 20 +- src/query/service/src/spillers/mod.rs | 1 + src/query/service/src/spillers/spiller.rs | 397 ++++++++------ src/query/service/src/test_kits/fixture.rs | 3 +- .../service/tests/it/spillers/spiller.rs | 15 +- src/query/settings/src/settings_default.rs | 12 + .../settings/src/settings_getter_setter.rs | 8 + src/query/storages/common/cache/Cargo.toml | 1 + src/query/storages/common/cache/src/lib.rs | 3 + .../storages/common/cache/src/temp_dir.rs | 410 +++++++++++++++ .../window_partition_spill.test | 3 + tests/sqllogictests/suites/tpcds/spill.test | 3 + tests/sqllogictests/suites/tpch/spill.test | 3 + 39 files changed, 1602 insertions(+), 327 deletions(-) create mode 100644 src/common/base/src/base/dma.rs create mode 100644 src/query/storages/common/cache/src/temp_dir.rs diff --git a/Cargo.lock b/Cargo.lock index 45c3e8cbc860..496eb42ab184 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -757,7 +757,7 @@ dependencies = [ "futures-lite 2.3.0", "parking", "polling 3.7.3", - "rustix 0.38.34", + "rustix 0.38.37", "slab", "tracing", "windows-sys 0.59.0", @@ -1951,7 +1951,7 @@ dependencies = [ "io-lifetimes 2.0.3", "ipnet", "maybe-owned", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", "winx", ] @@ -1975,7 +1975,7 @@ dependencies = [ "cap-primitives", "io-extras", "io-lifetimes 2.0.3", - "rustix 0.38.34", + "rustix 0.38.37", ] [[package]] @@ -1988,7 +1988,7 @@ dependencies = [ "cap-primitives", "iana-time-zone", "once_cell", - "rustix 0.38.34", + "rustix 0.38.37", "winx", ] @@ -3265,6 +3265,7 @@ dependencies = [ "rand 0.8.5", "regex", "replace_with", + "rustix 0.38.37", "semver", "serde", "serde_json", @@ -5386,6 +5387,7 @@ dependencies = [ "log", "parking_lot 0.12.3", "rayon", + "rustix 0.38.37", "siphasher", "tempfile", ] @@ -6408,7 +6410,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e5768da2206272c81ef0b5e951a41862938a6070da63bcea197899942d3b947" dependencies = [ "cfg-if", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", ] @@ -6689,7 +6691,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "033b337d725b97690d86893f9de22b67b80dcc4e9ad815f348254c38119db8fb" dependencies = [ "io-lifetimes 2.0.3", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", ] @@ -6709,7 +6711,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7e180ac76c23b45e767bd7ae9579bc0bb458618c4bc71835926e098e61d15f8" dependencies = [ - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", ] @@ -7545,7 +7547,7 @@ dependencies = [ "itoa", "libc", "memmap2 0.9.4", - "rustix 0.38.34", + "rustix 0.38.37", "smallvec", "thiserror", ] @@ -7707,7 +7709,7 @@ dependencies = [ "gix-command", "gix-config-value", "parking_lot 0.12.3", - "rustix 0.38.34", + "rustix 0.38.37", "thiserror", ] @@ -8520,7 +8522,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -9518,9 +9520,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" [[package]] name = "libflate" @@ -9553,7 +9555,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -9933,7 +9935,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2cffa4ad52c6f791f4f8b15f0c05f9824b2ced1160e88cc393d64fff9a8ac64" dependencies = [ - "rustix 0.38.34", + "rustix 0.38.37", ] [[package]] @@ -10958,7 +10960,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tonic 0.12.2", + "tonic 0.12.3", ] [[package]] @@ -10972,7 +10974,7 @@ dependencies = [ "opentelemetry_sdk", "prost 0.13.1", "serde", - "tonic 0.12.2", + "tonic 0.12.3", ] [[package]] @@ -11660,7 +11662,7 @@ dependencies = [ "concurrent-queue", "hermit-abi 0.4.0", "pin-project-lite", - "rustix 0.38.34", + "rustix 0.38.37", "tracing", "windows-sys 0.59.0", ] @@ -11911,7 +11913,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix 0.38.34", + "rustix 0.38.37", ] [[package]] @@ -12063,7 +12065,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.58", @@ -12207,7 +12209,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.12.3", + "parking_lot 0.11.2", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -13232,9 +13234,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" dependencies = [ "bitflags 2.6.0", "errno", @@ -14755,7 +14757,7 @@ dependencies = [ "cap-std", "fd-lock", "io-lifetimes 2.0.3", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", "winx", ] @@ -14945,7 +14947,7 @@ dependencies = [ "cfg-if", "fastrand 2.1.0", "once_cell", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.59.0", ] @@ -14974,7 +14976,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7" dependencies = [ - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.48.0", ] @@ -15242,9 +15244,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", @@ -15377,9 +15379,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6f6ba989e4b2c58ae83d862d3a3e27690b6e3ae630d0deb59f3697f32aa88ad" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", @@ -15576,7 +15578,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.8.5", + "rand 0.7.3", "static_assertions", ] @@ -16069,7 +16071,7 @@ dependencies = [ "io-lifetimes 2.0.3", "log", "once_cell", - "rustix 0.38.34", + "rustix 0.38.37", "system-interface", "thiserror", "tracing", @@ -16235,7 +16237,7 @@ dependencies = [ "once_cell", "paste", "rayon", - "rustix 0.38.34", + "rustix 0.38.37", "semver", "serde", "serde_derive", @@ -16278,7 +16280,7 @@ dependencies = [ "bincode 1.3.3", "directories-next", "log", - "rustix 0.38.34", + "rustix 0.38.37", "serde", "serde_derive", "sha2", @@ -16367,7 +16369,7 @@ dependencies = [ "anyhow", "cc", "cfg-if", - "rustix 0.38.34", + "rustix 0.38.37", "wasmtime-asm-macros", "wasmtime-versioned-export-macros", "windows-sys 0.52.0", @@ -16381,7 +16383,7 @@ checksum = "983ca409f2cd66385ce49486c022da0128acb7910c055beb5230998b49c6084c" dependencies = [ "object 0.33.0", "once_cell", - "rustix 0.38.34", + "rustix 0.38.37", "wasmtime-versioned-export-macros", ] @@ -16414,7 +16416,7 @@ dependencies = [ "memoffset", "paste", "psm", - "rustix 0.38.34", + "rustix 0.38.37", "sptr", "wasm-encoder 0.202.0", "wasmtime-asm-macros", diff --git a/Cargo.toml b/Cargo.toml index b0b0a7f24b78..683d1c2e8429 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -247,6 +247,7 @@ http = "1" itertools = "0.10.5" jsonb = "0.4.3" jwt-simple = "0.11.0" +libc = { version = "0.2.158" } match-template = "0.0.1" mysql_async = { version = "0.34", default-features = false, features = ["native-tls-tls"] } object_store_opendal = "0.46" diff --git a/src/common/arrow/src/arrow/array/mod.rs b/src/common/arrow/src/arrow/array/mod.rs index 0b6aab51d50d..3f893ebaf15f 100644 --- a/src/common/arrow/src/arrow/array/mod.rs +++ b/src/common/arrow/src/arrow/array/mod.rs @@ -31,7 +31,7 @@ //! to a concrete struct based on [`PhysicalType`](crate::arrow::datatypes::PhysicalType) available from [`Array::data_type`]. //! All immutable arrays are backed by [`Buffer`](crate::arrow::buffer::Buffer) and thus cloning and slicing them is `O(1)`. //! -//! Most arrays contain a [`MutableArray`] counterpart that is neither clonable nor sliceable, but +//! Most arrays contain a [`MutableArray`] counterpart that is neither cloneable nor sliceable, but //! can be operated in-place. use std::any::Any; use std::sync::Arc; diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 62a88b3efce1..ab434f8558db 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -31,7 +31,7 @@ databend-common-exception = { workspace = true } enquote = "1.1.0" fastrace = { workspace = true } futures = { workspace = true } -libc = "0.2.153" +libc = { workspace = true } log = { workspace = true } logcall = { workspace = true } micromarshal = "0.5.0" @@ -50,6 +50,7 @@ prometheus-parse = "0.2.3" rand = { workspace = true, features = ["serde1"] } regex = { workspace = true } replace_with = "0.1.7" +rustix = "0.38.37" semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs new file mode 100644 index 000000000000..063b9d5467fd --- /dev/null +++ b/src/common/base/src/base/dma.rs @@ -0,0 +1,495 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::alloc::AllocError; +use std::alloc::Allocator; +use std::alloc::Global; +use std::alloc::Layout; +use std::io; +use std::io::IoSlice; +use std::io::SeekFrom; +use std::ops::Range; +use std::os::fd::BorrowedFd; +use std::os::unix::io::AsRawFd; +use std::path::Path; +use std::ptr::Alignment; +use std::ptr::NonNull; + +use rustix::fs::OFlags; +use tokio::fs::File; +use tokio::io::AsyncSeekExt; + +use crate::runtime::spawn_blocking; + +unsafe impl Send for DmaAllocator {} + +pub struct DmaAllocator(Alignment); + +impl DmaAllocator { + pub fn new(align: Alignment) -> DmaAllocator { + DmaAllocator(align) + } + + fn real_layout(&self, layout: Layout) -> Layout { + Layout::from_size_align(layout.size(), self.0.as_usize()).unwrap() + } + + fn real_cap(&self, cap: usize) -> usize { + align_up(self.0, cap) + } +} + +unsafe impl Allocator for DmaAllocator { + fn allocate(&self, layout: Layout) -> Result, AllocError> { + Global {}.allocate(self.real_layout(layout)) + } + + fn allocate_zeroed(&self, layout: Layout) -> Result, AllocError> { + Global {}.allocate_zeroed(self.real_layout(layout)) + } + + unsafe fn grow( + &self, + ptr: NonNull, + old_layout: Layout, + new_layout: Layout, + ) -> Result, AllocError> { + Global {}.grow( + ptr, + self.real_layout(old_layout), + self.real_layout(new_layout), + ) + } + + unsafe fn grow_zeroed( + &self, + ptr: NonNull, + old_layout: Layout, + new_layout: Layout, + ) -> Result, AllocError> { + Global {}.grow_zeroed( + ptr, + self.real_layout(old_layout), + self.real_layout(new_layout), + ) + } + + unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: Layout) { + Global {}.deallocate(ptr, self.real_layout(layout)) + } +} + +type DmaBuffer = Vec; + +pub fn dma_buffer_as_vec(mut buf: DmaBuffer) -> Vec { + let ptr = buf.as_mut_ptr(); + let len = buf.len(); + let cap = buf.allocator().real_cap(buf.capacity()); + std::mem::forget(buf); + + unsafe { Vec::from_raw_parts(ptr, len, cap) } +} + +/// A `DmaFile` is similar to a `File`, but it is opened with the `O_DIRECT` file in order to +/// perform direct IO. +struct DmaFile { + fd: File, + alignment: Alignment, + buf: Option, +} + +impl DmaFile { + /// Attempts to open a file in read-only mode. + async fn open(path: impl AsRef) -> io::Result { + let file = File::options() + .read(true) + .custom_flags(OFlags::DIRECT.bits() as i32) + .open(path) + .await?; + + open_dma(file).await + } + + /// Opens a file in write-only mode. + async fn create(path: impl AsRef) -> io::Result { + let file = File::options() + .write(true) + .create(true) + .truncate(true) + .custom_flags((OFlags::DIRECT | OFlags::EXCL).bits() as i32) + .open(path) + .await?; + + open_dma(file).await + } + + fn set_buffer(&mut self, buf: DmaBuffer) { + self.buf = Some(buf) + } + + /// Aligns `value` up to the memory alignment requirement for this file. + pub fn align_up(&self, value: usize) -> usize { + align_up(self.alignment, value) + } + + /// Aligns `value` down to the memory alignment requirement for this file. + pub fn align_down(&self, value: usize) -> usize { + align_down(self.alignment, value) + } + + /// Return the alignment requirement for this file. The returned alignment value can be used + /// to allocate a buffer to use with this file: + #[expect(dead_code)] + pub fn alignment(&self) -> Alignment { + self.alignment + } + + fn buffer(&self) -> &DmaBuffer { + self.buf.as_ref().unwrap() + } + + fn mut_buffer(&mut self) -> &mut DmaBuffer { + self.buf.as_mut().unwrap() + } + + fn write_direct(&mut self) -> io::Result { + let buf = self.buffer(); + match rustix::io::write(&self.fd, buf) { + Ok(n) => { + if n != buf.len() { + return Err(io::Error::new(io::ErrorKind::Other, "short write")); + } + self.mut_buffer().clear(); + Ok(n) + } + Err(e) => Err(e.into()), + } + } + + fn read_direct(&mut self, n: usize) -> io::Result { + let Self { fd, buf, .. } = self; + let buf = buf.as_mut().unwrap(); + if n > buf.capacity() - buf.len() { + return Err(io::Error::new(io::ErrorKind::Other, "buf not sufficient")); + } + let start = buf.len(); + unsafe { buf.set_len(buf.len() + n) }; + match rustix::io::read(fd, &mut (*buf)[start..]) { + Ok(n) => { + buf.truncate(start + n); + Ok(n) + } + Err(e) => Err(e.into()), + } + } + + fn truncate(&self, length: usize) -> io::Result<()> { + rustix::fs::ftruncate(&self.fd, length as u64).map_err(|e| e.into()) + } + + async fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.fd.seek(pos).await + } +} + +pub fn align_up(alignment: Alignment, value: usize) -> usize { + (value + alignment.as_usize() - 1) & alignment.mask() +} + +pub fn align_down(alignment: Alignment, value: usize) -> usize { + value & alignment.mask() +} + +async fn open_dma(file: File) -> io::Result { + let stat = fstatvfs(&file).await?; + let alignment = Alignment::new(stat.f_bsize.max(512) as usize).unwrap(); + + Ok(DmaFile { + fd: file, + alignment, + buf: None, + }) +} + +async fn fstatvfs(file: &File) -> io::Result { + let fd = file.as_raw_fd(); + asyncify(move || { + let fd = unsafe { BorrowedFd::borrow_raw(fd) }; + rustix::fs::fstatvfs(fd).map_err(|e| e.into()) + }) + .await +} + +async fn asyncify(f: F) -> io::Result +where + F: FnOnce() -> io::Result + Send + 'static, + T: Send + 'static, +{ + match spawn_blocking(f).await { + Ok(res) => res, + Err(_) => Err(io::Error::new( + io::ErrorKind::Other, + "background task failed", + )), + } +} + +pub async fn dma_write_file_vectored<'a>( + path: impl AsRef, + bufs: &'a [IoSlice<'a>], +) -> io::Result { + let mut file = DmaFile::create(path.as_ref()).await?; + + let file_length = bufs.iter().map(|buf| buf.len()).sum(); + if file_length == 0 { + return Ok(0); + } + + const BUFFER_SIZE: usize = 1024 * 1024; + let buffer_size = BUFFER_SIZE.min(file_length); + + let dma_buf = Vec::with_capacity_in( + file.align_up(buffer_size), + DmaAllocator::new(file.alignment), + ); + file.set_buffer(dma_buf); + + for src in bufs { + let mut src = &src[..]; + + while !src.is_empty() { + let dst = file.buffer(); + if dst.capacity() == dst.len() { + file = asyncify(move || file.write_direct().map(|_| file)).await?; + } + + let dst = file.mut_buffer(); + let remaining = dst.capacity() - dst.len(); + let n = src.len().min(remaining); + let (left, right) = src.split_at(n); + dst.extend_from_slice(left); + src = right; + } + } + + let len = file.buffer().len(); + if len > 0 { + let align_up = file.align_up(len); + if align_up == len { + asyncify(move || file.write_direct()).await?; + } else { + let dst = file.mut_buffer(); + unsafe { dst.set_len(align_up) } + asyncify(move || { + file.write_direct()?; + file.truncate(file_length) + }) + .await?; + } + } + + Ok(file_length) +} + +pub async fn dma_read_file( + path: impl AsRef, + mut writer: impl io::Write, +) -> io::Result { + const BUFFER_SIZE: usize = 1024 * 1024; + let mut file = DmaFile::open(path.as_ref()).await?; + let buf = Vec::with_capacity_in( + file.align_up(BUFFER_SIZE), + DmaAllocator::new(file.alignment), + ); + file.set_buffer(buf); + + let mut n = 0; + loop { + file = asyncify(move || { + let buf = file.buffer(); + let remain = buf.capacity() - buf.len(); + file.read_direct(remain).map(|_| file) + }) + .await?; + + let buf = file.buffer(); + if buf.is_empty() { + return Ok(n); + } + n += buf.len(); + writer.write_all(buf)?; + // WARN: Is it possible to have a short read but not eof? + let eof = buf.capacity() > buf.len(); + unsafe { file.mut_buffer().set_len(0) } + if eof { + return Ok(n); + } + } +} + +pub async fn dma_read_file_range( + path: impl AsRef, + range: Range, +) -> io::Result<(DmaBuffer, Range)> { + let mut file = DmaFile::open(path.as_ref()).await?; + + let align_start = file.align_down(range.start as usize); + let align_end = file.align_up(range.end as usize); + + let buf = Vec::with_capacity_in(align_end - align_start, DmaAllocator::new(file.alignment)); + file.set_buffer(buf); + + if align_start != 0 { + let offset = file.seek(SeekFrom::Start(align_start as u64)).await?; + if offset as usize != align_start { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "range out of range", + )); + } + } + + let mut n; + loop { + (file, n) = asyncify(move || { + let buf = file.buffer(); + let remain = buf.capacity() - buf.len(); + file.read_direct(remain).map(|n| (file, n)) + }) + .await?; + if align_start + file.buffer().len() >= range.end as usize { + break; + } + if n == 0 { + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "")); + } + } + + let rt_range = range.start as usize - align_start..range.end as usize - align_start; + Ok((file.buf.unwrap(), rt_range)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_read_write() { + let _ = std::fs::remove_file("test_file"); + + run_test(0).await.unwrap(); + run_test(100).await.unwrap(); + run_test(200).await.unwrap(); + + run_test(4096 - 1).await.unwrap(); + run_test(4096).await.unwrap(); + run_test(4096 + 1).await.unwrap(); + + run_test(4096 * 2 - 1).await.unwrap(); + run_test(4096 * 2).await.unwrap(); + run_test(4096 * 2 + 1).await.unwrap(); + + run_test(1024 * 1024 * 3 - 1).await.unwrap(); + run_test(1024 * 1024 * 3).await.unwrap(); + run_test(1024 * 1024 * 3 + 1).await.unwrap(); + } + + async fn run_test(n: usize) -> io::Result<()> { + let filename = "test_file"; + let want = (0..n).map(|i| (i % 256) as u8).collect::>(); + + let bufs = vec![IoSlice::new(&want)]; + let length = dma_write_file_vectored(filename, &bufs).await?; + + assert_eq!(length, want.len()); + + let mut got = Vec::new(); + + let length = dma_read_file(filename, &mut got).await?; + assert_eq!(length, want.len()); + assert_eq!(got, want); + + let (buf, range) = dma_read_file_range(filename, 0..length as u64).await?; + assert_eq!(&buf[range], &want); + + std::fs::remove_file(filename)?; + Ok(()) + } + + #[tokio::test] + async fn test_range_read() { + let filename = "test_file2"; + let _ = std::fs::remove_file(filename); + let n: usize = 4096 * 2; + + let want = (0..n).map(|i| (i % 256) as u8).collect::>(); + + let bufs = vec![IoSlice::new(&want)]; + dma_write_file_vectored(filename, &bufs).await.unwrap(); + + let got = dma_read_file_range(filename, 0..10).await.unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[0..10], got); + + let got = dma_read_file_range(filename, 10..30).await.unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[10..30], got); + + let got = dma_read_file_range(filename, 4096 - 5..4096 + 5) + .await + .unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[4096 - 5..4096 + 5], got); + + let got = dma_read_file_range(filename, 4096..4096 + 5).await.unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[4096..4096 + 5], got); + + let got = dma_read_file_range(filename, 4096 * 2 - 5..4096 * 2) + .await + .unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[4096 * 2 - 5..4096 * 2], got); + + let _ = std::fs::remove_file(filename); + } + + #[tokio::test] + async fn test_read_direct() { + let filename = "test_file3"; + let _ = std::fs::remove_file(filename); + let stat = rustix::fs::statvfs(".").unwrap(); + let alignment = 512.max(stat.f_bsize as usize); + let file_size: usize = alignment * 2; + + let want = (0..file_size).map(|i| (i % 256) as u8).collect::>(); + + let bufs = vec![IoSlice::new(&want)]; + dma_write_file_vectored(filename, &bufs).await.unwrap(); + + let mut file = DmaFile::open(filename).await.unwrap(); + let buf = Vec::with_capacity_in(file_size, DmaAllocator::new(file.alignment)); + file.set_buffer(buf); + + let got = file.read_direct(alignment).unwrap(); + assert_eq!(alignment, got); + assert_eq!(&want[0..alignment], &**file.buffer()); + + let got = file.read_direct(alignment).unwrap(); + assert_eq!(alignment, got); + assert_eq!(&want, &**file.buffer()); + + let _ = std::fs::remove_file(filename); + } +} diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index 175730845145..5ac11ea7a46c 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod dma; mod net; mod ordered_float; mod profiling; @@ -27,6 +28,10 @@ mod take_mut; mod uniq_id; mod watch_notify; +pub use dma::dma_buffer_as_vec; +pub use dma::dma_read_file; +pub use dma::dma_read_file_range; +pub use dma::dma_write_file_vectored; pub use net::get_free_tcp_port; pub use net::get_free_udp_port; pub use ordered_float::OrderedFloat; diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 04ae30bd8bb0..31060daffe9e 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -24,6 +24,7 @@ #![feature(alloc_error_hook)] #![feature(slice_swap_unchecked)] #![feature(variant_count)] +#![feature(ptr_alignment_type)] pub mod base; pub mod containers; diff --git a/src/common/base/src/runtime/profile/profiles.rs b/src/common/base/src/runtime/profile/profiles.rs index 3f9a4a40d795..6f75ef5d0afc 100644 --- a/src/common/base/src/runtime/profile/profiles.rs +++ b/src/common/base/src/runtime/profile/profiles.rs @@ -37,12 +37,23 @@ pub enum ProfileStatisticsName { ScanBytes, ScanCacheBytes, ScanPartitions, - SpillWriteCount, - SpillWriteBytes, - SpillWriteTime, - SpillReadCount, - SpillReadBytes, - SpillReadTime, + + RemoteSpillWriteCount, + RemoteSpillWriteBytes, + RemoteSpillWriteTime, + + RemoteSpillReadCount, + RemoteSpillReadBytes, + RemoteSpillReadTime, + + LocalSpillWriteCount, + LocalSpillWriteBytes, + LocalSpillWriteTime, + + LocalSpillReadCount, + LocalSpillReadBytes, + LocalSpillReadTime, + RuntimeFilterPruneParts, MemoryUsage, ExternalServerRetryCount, @@ -189,45 +200,87 @@ pub fn get_statistics_desc() -> Arc unit: StatisticsUnit::Count, plain_statistics: true, }), - (ProfileStatisticsName::SpillWriteCount, ProfileDesc { - display_name: "numbers spilled by write", - desc: "The number of spilled by write", - index: ProfileStatisticsName::SpillWriteCount as usize, + (ProfileStatisticsName::RemoteSpillWriteCount, ProfileDesc { + display_name: "numbers remote spilled by write", + desc: "The number of remote spilled by write", + index: ProfileStatisticsName::RemoteSpillWriteCount as usize, + unit: StatisticsUnit::Count, + plain_statistics: true, + }), + (ProfileStatisticsName::RemoteSpillWriteBytes, ProfileDesc { + display_name: "bytes remote spilled by write", + desc: "The bytes remote spilled by write", + index: ProfileStatisticsName::RemoteSpillWriteBytes as usize, + unit: StatisticsUnit::Bytes, + plain_statistics: true, + }), + (ProfileStatisticsName::RemoteSpillWriteTime, ProfileDesc { + display_name: "remote spilled time by write", + desc: "The time spent to write remote spill in millisecond", + index: ProfileStatisticsName::RemoteSpillWriteTime as usize, + unit: StatisticsUnit::MillisSeconds, + plain_statistics: false, + }), + (ProfileStatisticsName::RemoteSpillReadCount, ProfileDesc { + display_name: "numbers remote spilled by read", + desc: "The number of remote spilled by read", + index: ProfileStatisticsName::RemoteSpillReadCount as usize, + unit: StatisticsUnit::Count, + plain_statistics: true, + }), + (ProfileStatisticsName::RemoteSpillReadBytes, ProfileDesc { + display_name: "bytes remote spilled by read", + desc: "The bytes remote spilled by read", + index: ProfileStatisticsName::RemoteSpillReadBytes as usize, + unit: StatisticsUnit::Bytes, + plain_statistics: true, + }), + (ProfileStatisticsName::RemoteSpillReadTime, ProfileDesc { + display_name: "remote spilled time by read", + desc: "The time spent to read remote spill in millisecond", + index: ProfileStatisticsName::RemoteSpillReadTime as usize, + unit: StatisticsUnit::MillisSeconds, + plain_statistics: false, + }), + (ProfileStatisticsName::LocalSpillWriteCount, ProfileDesc { + display_name: "numbers local spilled by write", + desc: "The number of local spilled by write", + index: ProfileStatisticsName::LocalSpillWriteCount as usize, unit: StatisticsUnit::Count, plain_statistics: true, }), - (ProfileStatisticsName::SpillWriteBytes, ProfileDesc { - display_name: "bytes spilled by write", - desc: "The bytes spilled by write", - index: ProfileStatisticsName::SpillWriteBytes as usize, + (ProfileStatisticsName::LocalSpillWriteBytes, ProfileDesc { + display_name: "bytes local spilled by write", + desc: "The bytes local spilled by write", + index: ProfileStatisticsName::LocalSpillWriteBytes as usize, unit: StatisticsUnit::Bytes, plain_statistics: true, }), - (ProfileStatisticsName::SpillWriteTime, ProfileDesc { - display_name: "spilled time by write", - desc: "The time spent to write spill in millisecond", - index: ProfileStatisticsName::SpillWriteTime as usize, + (ProfileStatisticsName::LocalSpillWriteTime, ProfileDesc { + display_name: "local spilled time by write", + desc: "The time spent to write local spill in millisecond", + index: ProfileStatisticsName::LocalSpillWriteTime as usize, unit: StatisticsUnit::MillisSeconds, plain_statistics: false, }), - (ProfileStatisticsName::SpillReadCount, ProfileDesc { - display_name: "numbers spilled by read", - desc: "The number of spilled by read", - index: ProfileStatisticsName::SpillReadCount as usize, + (ProfileStatisticsName::LocalSpillReadCount, ProfileDesc { + display_name: "numbers local spilled by read", + desc: "The number of local spilled by read", + index: ProfileStatisticsName::LocalSpillReadCount as usize, unit: StatisticsUnit::Count, plain_statistics: true, }), - (ProfileStatisticsName::SpillReadBytes, ProfileDesc { - display_name: "bytes spilled by read", - desc: "The bytes spilled by read", - index: ProfileStatisticsName::SpillReadBytes as usize, + (ProfileStatisticsName::LocalSpillReadBytes, ProfileDesc { + display_name: "bytes local spilled by read", + desc: "The bytes local spilled by read", + index: ProfileStatisticsName::LocalSpillReadBytes as usize, unit: StatisticsUnit::Bytes, plain_statistics: true, }), - (ProfileStatisticsName::SpillReadTime, ProfileDesc { - display_name: "spilled time by read", - desc: "The time spent to read spill in millisecond", - index: ProfileStatisticsName::SpillReadTime as usize, + (ProfileStatisticsName::LocalSpillReadTime, ProfileDesc { + display_name: "local spilled time by read", + desc: "The time spent to read local spill in millisecond", + index: ProfileStatisticsName::LocalSpillReadTime as usize, unit: StatisticsUnit::MillisSeconds, plain_statistics: false, }), diff --git a/src/common/tracing/Cargo.toml b/src/common/tracing/Cargo.toml index 58ebe2903dbf..e0c549930abc 100644 --- a/src/common/tracing/Cargo.toml +++ b/src/common/tracing/Cargo.toml @@ -21,7 +21,7 @@ defer = "0.2" fastrace = { workspace = true } fastrace-opentelemetry = { workspace = true } itertools = { workspace = true } -libc = "0.2.153" +libc = { workspace = true } log = { workspace = true } logforth = { version = "0.12", features = [ 'json', diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 767bdb255b22..ab1046e115c0 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::env; +use std::ffi::OsString; use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; @@ -25,6 +26,7 @@ use clap::Parser; use clap::Subcommand; use clap::ValueEnum; use databend_common_base::base::mask_string; +use databend_common_base::base::OrderedFloat; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::UserSettingValue; @@ -132,6 +134,10 @@ pub struct Config { #[clap(flatten)] pub cache: CacheConfig, + // spill Config + #[clap(flatten)] + pub spill: SpillConfig, + // background configs #[clap(flatten)] pub background: BackgroundConfig, @@ -2930,7 +2936,29 @@ pub struct DiskCacheConfig { pub sync_data: bool, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args, Default)] +#[serde(default, deny_unknown_fields)] +pub struct SpillConfig { + /// Path of spill to local disk. disable if it's empty. + #[clap( + long, + value_name = "VALUE", + default_value = "./.databend/temp/_query_spill" + )] + pub spill_local_disk_path: OsString, + + #[clap(long, value_name = "VALUE", default_value = "30")] + /// Percentage of reserve disk space that won't be used for spill to local disk. + pub spill_local_disk_reserved_space_percentage: OrderedFloat, + + #[clap(long, value_name = "VALUE", default_value = "18446744073709551615")] + /// Allow space in bytes to spill to local disk. + pub spill_local_disk_max_bytes: u64, +} + mod cache_config_converters { + use std::path::PathBuf; + use log::warn; use super::*; @@ -2953,6 +2981,7 @@ mod cache_config_converters { .map(|(k, v)| (k, v.into())) .collect(), cache: inner.cache.into(), + spill: inner.spill.into(), background: inner.background.into(), } } @@ -2962,30 +2991,55 @@ mod cache_config_converters { type Error = ErrorCode; fn try_into(self) -> Result { + let Config { + subcommand, + config_file, + query, + log, + meta, + storage, + catalog, + cache, + mut spill, + background, + catalogs: input_catalogs, + .. + } = self; + let mut catalogs = HashMap::new(); - for (k, v) in self.catalogs.into_iter() { + for (k, v) in input_catalogs.into_iter() { let catalog = v.try_into()?; catalogs.insert(k, catalog); } - if !self.catalog.address.is_empty() || !self.catalog.protocol.is_empty() { + if !catalog.address.is_empty() || !catalog.protocol.is_empty() { warn!( "`catalog` is planned to be deprecated, please add catalog in `catalogs` instead" ); - let hive = self.catalog.try_into()?; + let hive = catalog.try_into()?; let catalog = InnerCatalogConfig::Hive(hive); catalogs.insert(CATALOG_HIVE.to_string(), catalog); } + // Trick for cloud, perhaps we should introduce a new configuration for the local writeable root. + if cache.disk_cache_config.path != inner::DiskCacheConfig::default().path + && spill.spill_local_disk_path == inner::SpillConfig::default().path + { + spill.spill_local_disk_path = PathBuf::from(&cache.disk_cache_config.path) + .join("temp/_query_spill") + .into(); + }; + Ok(InnerConfig { - subcommand: self.subcommand, - config_file: self.config_file, - query: self.query.try_into()?, - log: self.log.try_into()?, - meta: self.meta.try_into()?, - storage: self.storage.try_into()?, + subcommand, + config_file, + query: query.try_into()?, + log: log.try_into()?, + meta: meta.try_into()?, + storage: storage.try_into()?, catalogs, - cache: self.cache.try_into()?, - background: self.background.try_into()?, + cache: cache.try_into()?, + spill: spill.try_into()?, + background: background.try_into()?, }) } } @@ -3047,6 +3101,41 @@ mod cache_config_converters { } } + impl TryFrom for inner::SpillConfig { + type Error = ErrorCode; + + fn try_from(value: SpillConfig) -> std::result::Result { + let SpillConfig { + spill_local_disk_path, + spill_local_disk_reserved_space_percentage: spill_local_disk_max_space_percentage, + spill_local_disk_max_bytes, + } = value; + if !spill_local_disk_max_space_percentage.is_normal() + || spill_local_disk_max_space_percentage.is_sign_negative() + || spill_local_disk_max_space_percentage > OrderedFloat(100.0) + { + return Err(ErrorCode::InvalidArgument( + "invalid spill_local_disk_max_space_percentage", + )); + } + Ok(Self { + path: spill_local_disk_path, + reserved_disk_ratio: spill_local_disk_max_space_percentage / 100.0, + global_bytes_limit: spill_local_disk_max_bytes, + }) + } + } + + impl From for SpillConfig { + fn from(value: inner::SpillConfig) -> Self { + Self { + spill_local_disk_path: value.path, + spill_local_disk_reserved_space_percentage: value.reserved_disk_ratio * 100.0, + spill_local_disk_max_bytes: value.global_bytes_limit, + } + } + } + impl TryFrom for inner::DiskCacheConfig { type Error = ErrorCode; fn try_from(value: DiskCacheConfig) -> std::result::Result { diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index cf6dc8fb847f..fb4a32c8afa2 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::ffi::OsString; use std::fmt; use std::fmt::Debug; use std::fmt::Display; @@ -22,6 +23,7 @@ use std::time::Duration; use databend_common_base::base::mask_string; use databend_common_base::base::GlobalUniqName; +use databend_common_base::base::OrderedFloat; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_grpc::RpcClientConf; @@ -64,6 +66,9 @@ pub struct InnerConfig { // Cache Config pub cache: CacheConfig, + // Spill Config + pub spill: SpillConfig, + // Background Config pub background: InnerBackgroundConfig, } @@ -141,6 +146,7 @@ impl Debug for InnerConfig { .field("storage", &self.storage) .field("catalogs", &self.catalogs) .field("cache", &self.cache) + .field("spill", &self.spill) .field("background", &self.background) .finish() } @@ -701,3 +707,25 @@ impl Default for CacheConfig { } } } + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SpillConfig { + /// Path of spill to local disk. disable if it's empty. + pub path: OsString, + + /// Ratio of the reserve of the disk space. + pub reserved_disk_ratio: OrderedFloat, + + /// Allow bytes use of disk space. + pub global_bytes_limit: u64, +} + +impl Default for SpillConfig { + fn default() -> Self { + Self { + path: OsString::from("./.databend/temp/_query_spill"), + reserved_disk_ratio: OrderedFloat(0.3), + global_bytes_limit: u64::MAX, + } + } +} diff --git a/src/query/config/src/lib.rs b/src/query/config/src/lib.rs index 269241a72bed..c6a9313447c9 100644 --- a/src/query/config/src/lib.rs +++ b/src/query/config/src/lib.rs @@ -49,6 +49,7 @@ pub use inner::CatalogConfig; pub use inner::CatalogHiveConfig; pub use inner::DiskCacheKeyReloadPolicy; pub use inner::InnerConfig; +pub use inner::SpillConfig; pub use inner::ThriftProtocol; pub use version::DATABEND_COMMIT_VERSION; pub use version::QUERY_GIT_SEMVER; diff --git a/src/query/config/src/mask.rs b/src/query/config/src/mask.rs index 37fae7279aac..62a5086b2c52 100644 --- a/src/query/config/src/mask.rs +++ b/src/query/config/src/mask.rs @@ -51,6 +51,7 @@ impl Config { storage: self.storage.mask_display(), catalog: self.catalog, cache: self.cache, + spill: self.spill, background: self.background, catalogs: self.catalogs, } diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index c9f91b8818ef..c40a4ebb8601 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -35,6 +35,7 @@ use databend_common_users::builtin::BuiltIn; use databend_common_users::RoleCacheManager; use databend_common_users::UserApiProvider; use databend_storages_common_cache::CacheManager; +use databend_storages_common_cache::TempDirManager; use crate::auth::AuthMgr; use crate::builtin::BuiltinUDFs; @@ -146,6 +147,7 @@ impl GlobalServices { &config.query.max_server_memory_usage, config.query.tenant_id.tenant_name().to_string(), )?; + TempDirManager::init(&config.spill, config.query.tenant_id.tenant_name())?; if let Some(addr) = config.query.cloud_control_grpc_server_address.clone() { CloudControlApiProvider::init(addr, config.query.cloud_control_grpc_timeout).await?; diff --git a/src/query/service/src/interpreters/hook/vacuum_hook.rs b/src/query/service/src/interpreters/hook/vacuum_hook.rs index 35d0682bba30..21f01ea8cc6f 100644 --- a/src/query/service/src/interpreters/hook/vacuum_hook.rs +++ b/src/query/service/src/interpreters/hook/vacuum_hook.rs @@ -23,7 +23,10 @@ use databend_common_license::license_manager::LicenseManagerSwitch; use databend_common_pipeline_core::query_spill_prefix; use databend_common_storage::DataOperator; use databend_enterprise_vacuum_handler::get_vacuum_handler; +use databend_storages_common_cache::TempDirManager; +use log::warn; use opendal::Buffer; +use rand::Rng; use crate::sessions::QueryContext; @@ -65,3 +68,19 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { Ok(()) } + +pub fn hook_disk_temp_dir(query_ctx: &Arc) -> Result<()> { + let mgr = TempDirManager::instance(); + + if mgr.drop_disk_spill_dir(&query_ctx.get_id())? && rand::thread_rng().gen_ratio(1, 10) { + let limit = query_ctx + .get_settings() + .get_spilling_to_disk_vacuum_unknown_temp_dirs_limit()?; + let deleted = mgr.drop_disk_spill_dir_unknown(limit)?; + if !deleted.is_empty() { + warn!("Deleted residual temporary directories: {:?}", deleted) + } + } + + Ok(()) +} diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index c74892dc32a0..1f220e17edf4 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -44,10 +44,11 @@ use log::info; use md5::Digest; use md5::Md5; -use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files; -use crate::interpreters::interpreter_txn_commit::CommitInterpreter; -use crate::interpreters::InterpreterMetrics; -use crate::interpreters::InterpreterQueryLog; +use super::hook::vacuum_hook::hook_disk_temp_dir; +use super::hook::vacuum_hook::hook_vacuum_temp_files; +use super::interpreter_txn_commit::CommitInterpreter; +use super::InterpreterMetrics; +use super::InterpreterQueryLog; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::executor::PipelinePullingExecutor; @@ -285,6 +286,8 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc) hook_vacuum_temp_files(&query_ctx)?; + hook_disk_temp_dir(&query_ctx)?; + let err_opt = match &info.res { Ok(_) => None, Err(e) => Some(e.clone()), diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index afbb3e25b85a..cb039e615312 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -282,14 +282,17 @@ impl SortPipelineBuilder { if may_spill { let schema = add_order_field(sort_merge_output_schema.clone(), &self.sort_desc); - let config = SpillerConfig::create(query_spill_prefix( - self.ctx.get_tenant().tenant_name(), - &self.ctx.get_id(), - )); + let config = SpillerConfig { + location_prefix: query_spill_prefix( + self.ctx.get_tenant().tenant_name(), + &self.ctx.get_id(), + ), + disk_spill: None, + spiller_type: SpillerType::OrderBy, + }; pipeline.add_transform(|input, output| { let op = DataOperator::instance().operator(); - let spiller = - Spiller::create(self.ctx.clone(), op, config.clone(), SpillerType::OrderBy)?; + let spiller = Spiller::create(self.ctx.clone(), op, config.clone())?; Ok(ProcessorPtr::create(create_transform_sort_spill( input, output, diff --git a/src/query/service/src/pipelines/builders/builder_window.rs b/src/query/service/src/pipelines/builders/builder_window.rs index 8fa28c443dbc..0bddf1fb2aed 100644 --- a/src/query/service/src/pipelines/builders/builder_window.rs +++ b/src/query/service/src/pipelines/builders/builder_window.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic; use std::sync::atomic::AtomicUsize; use databend_common_catalog::table_context::TableContext; @@ -24,6 +25,7 @@ use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_sql::executor::physical_plans::Window; use databend_common_sql::executor::physical_plans::WindowPartition; +use databend_storages_common_cache::TempDirManager; use crate::pipelines::processors::transforms::FrameBound; use crate::pipelines::processors::transforms::TransformWindowPartitionCollect; @@ -141,11 +143,6 @@ impl PipelineBuilder { // Settings. let settings = self.ctx.get_settings(); let num_partitions = settings.get_window_num_partitions()?; - let max_block_size = settings.get_max_block_size()? as usize; - let sort_block_size = settings.get_window_partition_sort_block_size()? as usize; - let sort_spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?; - let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; - let window_spill_settings = WindowSpillSettings::new(settings.clone(), num_processors)?; let plan_schema = window_partition.output_schema()?; @@ -169,13 +166,18 @@ impl PipelineBuilder { }) .collect::>>()?; - let have_order_col = window_partition.after_exchange.unwrap_or(false); - self.main_pipeline.exchange( num_processors, WindowPartitionExchange::create(partition_by.clone(), num_partitions), ); + let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?; + let temp_dir_manager = TempDirManager::instance(); + let disk_spill = temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()); + + let window_spill_settings = WindowSpillSettings::new(&settings, num_processors)?; + let have_order_col = window_partition.after_exchange.unwrap_or(false); + let processor_id = AtomicUsize::new(0); self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(Box::new( @@ -183,16 +185,14 @@ impl PipelineBuilder { self.ctx.clone(), input, output, - processor_id.fetch_add(1, std::sync::atomic::Ordering::AcqRel), + &settings, + processor_id.fetch_add(1, atomic::Ordering::AcqRel), num_processors, num_partitions, window_spill_settings.clone(), + disk_spill.clone(), sort_desc.clone(), plan_schema.clone(), - max_block_size, - sort_block_size, - sort_spilling_batch_bytes, - enable_loser_tree, have_order_col, )?, ))) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs index 787a199fe537..3b3d56586b54 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs @@ -269,10 +269,13 @@ pub fn agg_spilling_aggregate_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } @@ -366,10 +369,13 @@ pub fn spilling_aggregate_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 8d621aba7ea6..7e38f9ec41e0 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -311,10 +311,13 @@ fn agg_spilling_aggregate_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } @@ -432,10 +435,13 @@ fn spilling_aggregate_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index bdd2b95e29cd..d68a956d1ec9 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -365,10 +365,13 @@ fn agg_spilling_group_by_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } @@ -484,10 +487,13 @@ fn spilling_group_by_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs index 5a3a35219780..04d0c36b7f3d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs @@ -267,10 +267,13 @@ pub fn agg_spilling_group_by_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } @@ -360,10 +363,13 @@ pub fn spilling_group_by_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index 3bf4f8bd7d3a..f625de75db9b 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -227,15 +227,15 @@ impl Processor // perf { Profile::record_usize_profile( - ProfileStatisticsName::SpillReadCount, + ProfileStatisticsName::RemoteSpillReadCount, 1, ); Profile::record_usize_profile( - ProfileStatisticsName::SpillReadBytes, + ProfileStatisticsName::RemoteSpillReadBytes, data.len(), ); Profile::record_usize_profile( - ProfileStatisticsName::SpillReadTime, + ProfileStatisticsName::RemoteSpillReadTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index c9c0a9977341..876542882644 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -62,17 +62,18 @@ impl HashJoinSpiller { is_build_side: bool, ) -> Result { // Create a Spiller for spilling build side data. - let spill_config = SpillerConfig::create(query_spill_prefix( - ctx.get_tenant().tenant_name(), - &ctx.get_id(), - )); - let operator = DataOperator::instance().operator(); let spiller_type = if is_build_side { SpillerType::HashJoinBuild } else { SpillerType::HashJoinProbe }; - let spiller = Spiller::create(ctx.clone(), operator, spill_config, spiller_type)?; + let spill_config = SpillerConfig { + location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), + disk_spill: None, + spiller_type, + }; + let operator = DataOperator::instance().operator(); + let spiller = Spiller::create(ctx.clone(), operator, spill_config)?; let num_partitions = (1 << spill_partition_bits) as usize; // The memory threshold of each partition, we will spill the partition data diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index 67e9e0d21c4c..f0c6bd97d556 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -43,6 +43,7 @@ use databend_common_pipeline_transforms::processors::sort::SortSpillMeta; use databend_common_pipeline_transforms::processors::sort::SortSpillMetaWithParams; use databend_common_pipeline_transforms::processors::sort::SortedStream; +use crate::spillers::Location; use crate::spillers::Spiller; enum State { @@ -77,7 +78,7 @@ pub struct TransformSortSpill { /// Blocks to merge one time. num_merge: usize, /// Unmerged list of blocks. Each list are sorted. - unmerged_blocks: VecDeque>, + unmerged_blocks: VecDeque>, /// If `ummerged_blocks.len()` < `num_merge`, /// we can use a final merger to merge the last few sorted streams to reduce IO. @@ -359,7 +360,7 @@ where R: Rows + Sync + Send + 'static } enum BlockStream { - Spilled((VecDeque, Arc)), + Spilled((VecDeque, Arc)), Block(Option), } @@ -485,12 +486,13 @@ mod tests { limit: Option, ) -> Result>> { let op = DataOperator::instance().operator(); - let spiller = Spiller::create( - ctx.clone(), - op, - SpillerConfig::create("_spill_test".to_string()), - SpillerType::OrderBy, - )?; + let spill_config = SpillerConfig { + location_prefix: "_spill_test".to_string(), + disk_spill: None, + spiller_type: SpillerType::OrderBy, + }; + + let spiller = Spiller::create(ctx.clone(), op, spill_config)?; let sort_desc = Arc::new(vec![SortColumnDescription { offset: 0, diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index e81138ce4ea5..f983b6208e65 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -29,12 +29,20 @@ use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; +use databend_common_pipeline_core::query_spill_prefix; use databend_common_pipeline_transforms::processors::sort_merge; +use databend_common_settings::Settings; +use databend_common_storage::DataOperator; +use databend_common_storages_fuse::TableContext; +use databend_storages_common_cache::TempDir; use super::WindowPartitionBuffer; use super::WindowPartitionMeta; use super::WindowSpillSettings; use crate::sessions::QueryContext; +use crate::spillers::Spiller; +use crate::spillers::SpillerConfig; +use crate::spillers::SpillerType; #[derive(Debug, Clone, Copy)] pub enum Step { @@ -81,21 +89,19 @@ pub struct TransformWindowPartitionCollect { } impl TransformWindowPartitionCollect { - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] pub fn new( ctx: Arc, input: Arc, output: Arc, + settings: &Settings, processor_id: usize, num_processors: usize, num_partitions: usize, spill_settings: WindowSpillSettings, + disk_spill: Option>, sort_desc: Vec, schema: DataSchemaRef, - max_block_size: usize, - sort_block_size: usize, - sort_spilling_batch_bytes: usize, - enable_loser_tree: bool, have_order_col: bool, ) -> Result { // Calculate the partition ids collected by the processor. @@ -109,9 +115,24 @@ impl TransformWindowPartitionCollect { partition_id[*partition] = new_partition_id; } + let spill_config = SpillerConfig { + location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), + disk_spill, + spiller_type: SpillerType::Window, + }; + + // Create an inner `Spiller` to spill data. + let operator = DataOperator::instance().operator(); + let spiller = Spiller::create(ctx, operator, spill_config)?; + // Create the window partition buffer. + let sort_block_size = settings.get_window_partition_sort_block_size()? as usize; let buffer = - WindowPartitionBuffer::new(ctx, partitions.len(), sort_block_size, spill_settings)?; + WindowPartitionBuffer::new(spiller, partitions.len(), sort_block_size, spill_settings)?; + + let max_block_size = settings.get_max_block_size()? as usize; + let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; + let sort_spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?; Ok(Self { input, diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs index dd93b0016c3c..28c6b9b2068d 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs @@ -12,23 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_settings::Settings; -use databend_common_storage::DataOperator; -use databend_common_storages_fuse::TableContext; -use crate::sessions::QueryContext; use crate::spillers::PartitionBuffer; use crate::spillers::PartitionBufferFetchOption; use crate::spillers::SpilledData; use crate::spillers::Spiller; -use crate::spillers::SpillerConfig; -use crate::spillers::SpillerType; /// The `WindowPartitionBuffer` is used to control memory usage of Window operator. pub struct WindowPartitionBuffer { @@ -46,19 +38,11 @@ pub struct WindowPartitionBuffer { impl WindowPartitionBuffer { pub fn new( - ctx: Arc, + spiller: Spiller, num_partitions: usize, sort_block_size: usize, spill_settings: WindowSpillSettings, ) -> Result { - // Create an inner `Spiller` to spill data. - let spill_config = SpillerConfig::create(query_spill_prefix( - ctx.get_tenant().tenant_name(), - &ctx.get_id(), - )); - let operator = DataOperator::instance().operator(); - let spiller = Spiller::create(ctx.clone(), operator, spill_config, SpillerType::Window)?; - // Create a `PartitionBuffer` to store partitioned data. let partition_buffer = PartitionBuffer::create(num_partitions); let restored_partition_buffer = PartitionBuffer::create(num_partitions); @@ -296,7 +280,7 @@ pub struct WindowSpillSettings { } impl WindowSpillSettings { - pub fn new(settings: Arc, num_threads: usize) -> Result { + pub fn new(settings: &Settings, num_threads: usize) -> Result { let global_memory_ratio = std::cmp::min(settings.get_window_partition_spilling_memory_ratio()?, 100) as f64 / 100_f64; diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index b771bce717f7..31d420849041 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -17,6 +17,7 @@ mod spiller; pub use partition_buffer::PartitionBuffer; pub use partition_buffer::PartitionBufferFetchOption; +pub use spiller::Location; pub use spiller::SpilledData; pub use spiller::Spiller; pub use spiller::SpillerConfig; diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 54c4bd58129d..d13229b65df6 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -16,10 +16,14 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; +use std::io; use std::ops::Range; use std::sync::Arc; use std::time::Instant; +use databend_common_base::base::dma_buffer_as_vec; +use databend_common_base::base::dma_read_file_range; +use databend_common_base::base::dma_write_file_vectored; use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; @@ -29,6 +33,8 @@ use databend_common_exception::Result; use databend_common_expression::arrow::deserialize_column; use databend_common_expression::arrow::serialize_column; use databend_common_expression::DataBlock; +use databend_storages_common_cache::TempDir; +use databend_storages_common_cache::TempPath; use opendal::Operator; use crate::sessions::QueryContext; @@ -59,12 +65,8 @@ impl Display for SpillerType { #[derive(Clone)] pub struct SpillerConfig { pub location_prefix: String, -} - -impl SpillerConfig { - pub fn create(location_prefix: String) -> Self { - Self { location_prefix } - } + pub disk_spill: Option>, + pub spiller_type: SpillerType, } /// Spiller is a unified framework for operators which need to spill data from memory. @@ -77,13 +79,14 @@ impl SpillerConfig { pub struct Spiller { ctx: Arc, operator: Operator, - config: SpillerConfig, + location_prefix: String, + disk_spill: Option>, _spiller_type: SpillerType, pub join_spilling_partition_bits: usize, /// 1 partition -> N partition files - pub partition_location: HashMap>, + pub partition_location: HashMap>, /// Record columns layout for spilled data, will be used when read data from disk - pub columns_layout: HashMap>, + pub columns_layout: HashMap>, /// Record how many bytes have been spilled for each partition. pub partition_spilled_bytes: HashMap, } @@ -94,13 +97,18 @@ impl Spiller { ctx: Arc, operator: Operator, config: SpillerConfig, - spiller_type: SpillerType, ) -> Result { let join_spilling_partition_bits = ctx.get_settings().get_join_spilling_partition_bits()?; + let SpillerConfig { + location_prefix, + disk_spill, + spiller_type, + } = config; Ok(Self { - ctx: ctx.clone(), + ctx, operator, - config, + location_prefix, + disk_spill, _spiller_type: spiller_type, join_spilling_partition_bits, partition_location: Default::default(), @@ -114,31 +122,20 @@ impl Spiller { } /// Spill a [`DataBlock`] to storage. - pub async fn spill(&mut self, data_block: DataBlock) -> Result { - // Serialize data block. - let (data_size, columns_data, columns_layout) = self.serialize_data_block(data_block)?; + pub async fn spill(&mut self, data_block: DataBlock) -> Result { + let instant = Instant::now(); // Spill data to storage. - let instant = Instant::now(); - let unique_name = GlobalUniqName::unique(); - let location = format!("{}/{}", self.config.location_prefix, unique_name); - let mut writer = self - .operator - .writer_with(&location) - .chunk(8 * 1024 * 1024) - .await?; - for data in columns_data.into_iter() { - writer.write(data).await?; - } - writer.close().await?; + let encoded = EncodedBlock::from_block(data_block); + let columns_layout = encoded.columns_layout(); + let data_size = encoded.size(); + let location = self.write_encodes(data_size, vec![encoded]).await?; // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, data_size as usize); - Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, - instant.elapsed().as_millis() as usize, - ); + match location { + Location::Remote(_) => record_remote_write_profile(&instant, data_size), + Location::Local(_) => record_local_write_profile(&instant, data_size), + } // Record columns layout for spilled data. self.columns_layout.insert(location.clone(), columns_layout); @@ -187,37 +184,25 @@ impl Spiller { let mut spilled_partitions = Vec::with_capacity(partitioned_data.len()); for (partition_id, data_block) in partitioned_data.into_iter() { let begin = write_bytes; - let (data_size, columns_data, columns_layout) = - self.serialize_data_block(data_block)?; + + let encoded = EncodedBlock::from_block(data_block); + let columns_layout = encoded.columns_layout(); + let data_size = encoded.size(); write_bytes += data_size; - write_data.push(columns_data); + write_data.push(encoded); spilled_partitions.push((partition_id, begin..write_bytes, columns_layout)); } // Spill data to storage. let instant = Instant::now(); - let unique_name = GlobalUniqName::unique(); - let location = format!("{}/{}", self.config.location_prefix, unique_name); - let mut writer = self - .operator - .writer_with(&location) - .chunk(8 * 1024 * 1024) - .await?; - for write_bucket_data in write_data.into_iter() { - for data in write_bucket_data.into_iter() { - writer.write(data).await?; - } - } - writer.close().await?; + let location = self.write_encodes(write_bytes, write_data).await?; // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes as usize); - Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, - instant.elapsed().as_millis() as usize, - ); + match location { + Location::Remote(_) => record_remote_write_profile(&instant, write_bytes), + Location::Local(_) => record_local_write_profile(&instant, write_bytes), + } Ok(SpilledData::MergedPartition { location, @@ -227,43 +212,38 @@ impl Spiller { /// Read a certain file to a [`DataBlock`]. /// We should guarantee that the file is managed by this spiller. - pub async fn read_spilled_file(&self, file: &str) -> Result { - debug_assert!(self.columns_layout.contains_key(file)); + pub async fn read_spilled_file(&self, location: &Location) -> Result { + let columns_layout = self.columns_layout.get(location).unwrap(); // Read spilled data from storage. let instant = Instant::now(); - let data = self.operator.read(file).await?.to_bytes(); - - // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillReadBytes, data.len()); - Profile::record_usize_profile( - ProfileStatisticsName::SpillReadTime, - instant.elapsed().as_millis() as usize, - ); - - // Deserialize data block. - let mut begin = 0; - let mut columns = Vec::with_capacity(self.columns_layout.len()); - let columns_layout = self.columns_layout.get(file).unwrap(); - for column_layout in columns_layout.iter() { - columns.push( - deserialize_column(&data[begin as usize..(begin + column_layout) as usize]) - .unwrap(), - ); - begin += column_layout; - } + let block = match location { + Location::Remote(loc) => { + let data = self.operator.read(loc).await?.to_bytes(); + record_remote_read_profile(&instant, data.len()); + deserialize_block(columns_layout, &data) + } + Location::Local(path) => { + let file_size = path.size(); + debug_assert_eq!(file_size, columns_layout.iter().sum::()); + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + let data = &buf[range]; + record_local_read_profile(&instant, data.len()); + deserialize_block(columns_layout, data) + } + }; - Ok(DataBlock::new_from_columns(columns)) + Ok(block) } #[async_backtrace::framed] /// Read spilled data with partition id pub async fn read_spilled_partition(&mut self, p_id: &usize) -> Result> { - if let Some(files) = self.partition_location.get(p_id) { - let mut spilled_data = Vec::with_capacity(files.len()); - for file in files.iter() { - let block = self.read_spilled_file(file).await?; + if let Some(locs) = self.partition_location.get(p_id) { + let mut spilled_data = Vec::with_capacity(locs.len()); + for loc in locs.iter() { + let block = self.read_spilled_file(loc).await?; + if block.num_rows() != 0 { spilled_data.push(block); } @@ -285,30 +265,43 @@ impl Spiller { { // Read spilled data from storage. let instant = Instant::now(); - let data = self.operator.read(location).await?.to_bytes(); + + let data = match location { + Location::Remote(loc) => self.operator.read(loc).await?.to_bytes(), + Location::Local(path) => { + let file_size = path.size(); + debug_assert_eq!( + file_size, + if let Some((_, range, _)) = partitions.last() { + range.end + } else { + 0 + } + ); + + let (mut buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + assert_eq!(range.start, 0); + buf.truncate(range.end); + + dma_buffer_as_vec(buf).into() + } + }; // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillReadBytes, data.len()); - Profile::record_usize_profile( - ProfileStatisticsName::SpillReadTime, - instant.elapsed().as_millis() as usize, - ); + match location { + Location::Remote(_) => record_remote_read_profile(&instant, data.len()), + Location::Local(_) => record_local_read_profile(&instant, data.len()), + }; // Deserialize partitioned data block. - let mut partitioned_data = Vec::with_capacity(partitions.len()); - for (partition_id, range, columns_layout) in partitions.iter() { - let mut begin = range.start; - let mut columns = Vec::with_capacity(columns_layout.len()); - for column_layout in columns_layout.iter() { - columns.push( - deserialize_column(&data[begin as usize..(begin + column_layout) as usize]) - .unwrap(), - ); - begin += column_layout; - } - partitioned_data.push((*partition_id, DataBlock::new_from_columns(columns))); - } + let partitioned_data = partitions + .iter() + .map(|(partition_id, range, columns_layout)| { + let block = deserialize_block(columns_layout, &data[range.clone()]); + (*partition_id, block) + }) + .collect(); + return Ok(partitioned_data); } Ok(vec![]) @@ -316,70 +309,166 @@ impl Spiller { pub async fn read_range( &self, - location: &str, - data_range: Range, - columns_layout: &[u64], + location: &Location, + data_range: Range, + columns_layout: &[usize], ) -> Result { // Read spilled data from storage. let instant = Instant::now(); - let data = self - .operator - .read_with(location) - .range(data_range) - .await? - .to_vec(); + let data_range = data_range.start as u64..data_range.end as u64; + + match location { + Location::Remote(loc) => { + let data = self + .operator + .read_with(loc) + .range(data_range) + .await? + .to_bytes(); + record_remote_read_profile(&instant, data.len()); + Ok(deserialize_block(columns_layout, &data)) + } + Location::Local(path) => { + let (buf, range) = dma_read_file_range(path, data_range).await?; + let data = &buf[range]; + record_local_read_profile(&instant, data.len()); + Ok(deserialize_block(columns_layout, data)) + } + } + } - // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillReadBytes, data.len()); - Profile::record_usize_profile( - ProfileStatisticsName::SpillReadTime, - instant.elapsed().as_millis() as usize, - ); - - // Deserialize data block. - let mut begin = 0; - let mut columns = Vec::with_capacity(columns_layout.len()); - for column_layout in columns_layout.iter() { - columns.push( - deserialize_column(&data[begin as usize..(begin + column_layout) as usize]) - .unwrap(), - ); - begin += column_layout; + async fn write_encodes(&mut self, size: usize, blocks: Vec) -> Result { + let location = match &self.disk_spill { + None => None, + Some(disk) => disk.new_file_with_size(size)?.map(Location::Local), } + .unwrap_or(Location::Remote(format!( + "{}/{}", + self.location_prefix, + GlobalUniqName::unique(), + ))); + + let written = match &location { + Location::Remote(loc) => { + let mut writer = self + .operator + .writer_with(loc) + .chunk(8 * 1024 * 1024) + .await?; + + let mut written = 0; + for data in blocks.into_iter().flat_map(|x| x.0) { + written += data.len(); + writer.write(data).await?; + } - Ok(DataBlock::new_from_columns(columns)) + writer.close().await?; + written + } + Location::Local(path) => { + let bufs = blocks + .iter() + .flat_map(|x| &x.0) + .map(|data| io::IoSlice::new(data)) + .collect::>(); + + dma_write_file_vectored(path.as_ref(), &bufs).await? + } + }; + debug_assert_eq!(size, written); + Ok(location) } - pub(crate) fn spilled_files(&self) -> Vec { + pub(crate) fn spilled_files(&self) -> Vec { self.columns_layout.keys().cloned().collect() } - - // Serialize data block to (data_size, columns_data, columns_layout). - fn serialize_data_block(&self, data_block: DataBlock) -> Result<(u64, Vec>, Vec)> { - let num_columns = data_block.num_columns(); - let mut data_size = 0; - let mut columns_data = Vec::with_capacity(num_columns); - let mut columns_layout = Vec::with_capacity(num_columns); - - for column in data_block.columns() { - let column = column - .value - .convert_to_full_column(&column.data_type, data_block.num_rows()); - let column_data = serialize_column(&column); - - data_size += column_data.len() as u64; - columns_layout.push(column_data.len() as u64); - columns_data.push(column_data); - } - Ok((data_size, columns_data, columns_layout)) - } } pub enum SpilledData { - Partition(String), + Partition(Location), MergedPartition { - location: String, - partitions: Vec<(usize, Range, Vec)>, + location: Location, + partitions: Vec<(usize, Range, Vec)>, }, } + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub enum Location { + Remote(String), + Local(TempPath), +} + +pub struct EncodedBlock(pub Vec>); + +impl EncodedBlock { + pub fn from_block(block: DataBlock) -> Self { + let data = block + .columns() + .iter() + .map(|entry| { + let column = entry + .value + .convert_to_full_column(&entry.data_type, block.num_rows()); + serialize_column(&column) + }) + .collect(); + EncodedBlock(data) + } + + pub fn columns_layout(&self) -> Vec { + self.0.iter().map(|data| data.len()).collect() + } + + pub fn size(&self) -> usize { + self.0.iter().map(|data| data.len()).sum() + } +} + +pub fn deserialize_block(columns_layout: &[usize], mut data: &[u8]) -> DataBlock { + let columns = columns_layout + .iter() + .map(|layout| { + let (cur, remain) = data.split_at(*layout); + data = remain; + deserialize_column(cur).unwrap() + }) + .collect::>(); + + DataBlock::new_from_columns(columns) +} + +pub fn record_remote_write_profile(start: &Instant, write_bytes: usize) { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteBytes, write_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, + start.elapsed().as_millis() as usize, + ); +} + +pub fn record_remote_read_profile(start: &Instant, read_bytes: usize) { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillReadTime, + start.elapsed().as_millis() as usize, + ); +} + +pub fn record_local_write_profile(start: &Instant, write_bytes: usize) { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteBytes, write_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillWriteTime, + start.elapsed().as_millis() as usize, + ); +} + +pub fn record_local_read_profile(start: &Instant, read_bytes: usize) { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillReadTime, + start.elapsed().as_millis() as usize, + ); +} diff --git a/src/query/service/src/test_kits/fixture.rs b/src/query/service/src/test_kits/fixture.rs index 115606c9b7d4..09fb1534168a 100644 --- a/src/query/service/src/test_kits/fixture.rs +++ b/src/query/service/src/test_kits/fixture.rs @@ -17,7 +17,6 @@ use std::str; use std::sync::Arc; use databend_common_ast::ast::Engine; -use databend_common_base::runtime::drop_guard; use databend_common_catalog::catalog_kind::CATALOG_DEFAULT; use databend_common_catalog::cluster_info::Cluster; use databend_common_config::InnerConfig; @@ -114,7 +113,7 @@ impl Drop for TestGuard { fn drop(&mut self) { #[cfg(debug_assertions)] { - drop_guard(move || { + databend_common_base::runtime::drop_guard(move || { databend_common_base::base::GlobalInstance::drop_testing(&self._thread_name); }) } diff --git a/src/query/service/tests/it/spillers/spiller.rs b/src/query/service/tests/it/spillers/spiller.rs index 387ccd5a4062..ad9779a7d615 100644 --- a/src/query/service/tests/it/spillers/spiller.rs +++ b/src/query/service/tests/it/spillers/spiller.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; + use databend_common_base::base::tokio; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -24,6 +26,7 @@ use databend_common_expression::FromData; use databend_common_expression::ScalarRef; use databend_common_pipeline_core::query_spill_prefix; use databend_common_storage::DataOperator; +use databend_query::spillers::Location; use databend_query::spillers::Spiller; use databend_query::spillers::SpillerConfig; use databend_query::spillers::SpillerType; @@ -35,11 +38,14 @@ async fn test_spill_with_partition() -> Result<()> { let ctx = fixture.new_query_ctx().await?; let tenant = ctx.get_tenant(); - let spiller_config = - SpillerConfig::create(query_spill_prefix(tenant.tenant_name(), &ctx.get_id())); + let spiller_config = SpillerConfig { + location_prefix: query_spill_prefix(tenant.tenant_name(), &ctx.get_id()), + disk_spill: None, + spiller_type: SpillerType::HashJoinBuild, + }; let operator = DataOperator::instance().operator(); - let mut spiller = Spiller::create(ctx, operator, spiller_config, SpillerType::HashJoinBuild)?; + let mut spiller = Spiller::create(ctx, operator, spiller_config)?; // Generate data block: two columns, type is i32, 100 rows let data = DataBlock::new_from_columns(vec![ @@ -50,7 +56,8 @@ async fn test_spill_with_partition() -> Result<()> { let res = spiller.spill_with_partition(0, data).await; assert!(res.is_ok()); - assert!(spiller.partition_location.get(&0).unwrap()[0].starts_with("_query_spill")); + let location = &spiller.partition_location.get(&0).unwrap()[0]; + assert_matches!(location, Location::Remote(_)); // Test read spilled data let block = DataBlock::concat(&spiller.read_spilled_partition(&(0)).await?)?; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index a1a6ddb005f1..50f9aeb27e5b 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -303,6 +303,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("spilling_to_disk_vacuum_unknown_temp_dirs_limit", DefaultSettingValue { + value: UserSettingValue::UInt64(u64::MAX), + desc: "Set the maximum number of directories to clean up. If there are some temporary dirs when another query is unexpectedly interrupted, which needs to be cleaned up after this query.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("enable_merge_into_row_fetch", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Enable merge into row fetch optimization.", @@ -460,6 +466,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=100)), }), + ("window_partition_spilling_to_disk_bytes_limit", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Sets the maximum amount of local disk in bytes that each window partitioner can use before spilling data to storage during query execution.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("window_num_partitions", DefaultSettingValue { value: UserSettingValue::UInt64(256), desc: "Sets the number of partitions for window operator.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index e54806e1550f..55c2e46cd7e5 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -286,6 +286,10 @@ impl Settings { Ok(self.try_get_u64("join_spilling_buffer_threshold_per_proc_mb")? as usize) } + pub fn get_spilling_to_disk_vacuum_unknown_temp_dirs_limit(&self) -> Result { + Ok(self.try_get_u64("spilling_to_disk_vacuum_unknown_temp_dirs_limit")? as usize) + } + pub fn get_inlist_to_join_threshold(&self) -> Result { Ok(self.try_get_u64("inlist_to_join_threshold")? as usize) } @@ -395,6 +399,10 @@ impl Settings { Ok(self.try_get_u64("window_partition_spilling_bytes_threshold_per_proc")? as usize) } + pub fn get_window_partition_spilling_to_disk_bytes_limit(&self) -> Result { + Ok(self.try_get_u64("window_partition_spilling_to_disk_bytes_limit")? as usize) + } + pub fn get_window_partition_spilling_memory_ratio(&self) -> Result { Ok(self.try_get_u64("window_partition_spilling_memory_ratio")? as usize) } diff --git a/src/query/storages/common/cache/Cargo.toml b/src/query/storages/common/cache/Cargo.toml index d8c5b0bc4b08..7cb83f2577ee 100644 --- a/src/query/storages/common/cache/Cargo.toml +++ b/src/query/storages/common/cache/Cargo.toml @@ -31,6 +31,7 @@ hex = "0.4.3" log = { workspace = true } parking_lot = { workspace = true } rayon = "1.9.0" +rustix = "0.38.37" siphasher = "0.3.10" [dev-dependencies] diff --git a/src/query/storages/common/cache/src/lib.rs b/src/query/storages/common/cache/src/lib.rs index 0a7378134b71..8c70b627aeac 100644 --- a/src/query/storages/common/cache/src/lib.rs +++ b/src/query/storages/common/cache/src/lib.rs @@ -14,12 +14,14 @@ #![feature(write_all_vectored)] #![feature(associated_type_defaults)] +#![feature(assert_matches)] mod cache; mod caches; mod manager; mod providers; mod read; +mod temp_dir; pub use cache::CacheAccessor; pub use cache::Unit; @@ -45,3 +47,4 @@ pub use read::InMemoryCacheReader; pub use read::InMemoryItemCacheReader; pub use read::LoadParams; pub use read::Loader; +pub use temp_dir::*; diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs new file mode 100644 index 000000000000..5493e9faf400 --- /dev/null +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -0,0 +1,410 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fs::create_dir; +use std::fs::create_dir_all; +use std::fs::remove_dir_all; +use std::hash::Hash; +use std::io::ErrorKind; +use std::ops::Deref; +use std::ops::Drop; +use std::path::Path; +use std::path::PathBuf; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::Once; + +use databend_common_base::base::GlobalInstance; +use databend_common_base::base::GlobalUniqName; +use databend_common_config::SpillConfig; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use rustix::fs::statvfs; + +pub struct TempDirManager { + root: Option>, + + // Global limit in bytes + global_limit: usize, + // Reserved disk space in blocks + reserved: u64, + + group: Mutex, +} + +impl TempDirManager { + pub fn init(config: &SpillConfig, tenant_id: &str) -> Result<()> { + let (root, reserved) = if config.path.is_empty() { + (None, 0) + } else { + let path = PathBuf::from(&config.path) + .join(tenant_id) + .into_boxed_path(); + + if let Err(e) = remove_dir_all(&path) { + if !matches!(e.kind(), ErrorKind::NotFound) { + return Err(ErrorCode::StorageUnavailable(format!( + "can't clean temp dir: {e}", + ))); + } + } + + if create_dir_all(&path).is_err() { + (None, 0) + } else { + let stat = + statvfs(path.as_ref()).map_err(|e| ErrorCode::StorageOther(e.to_string()))?; + let reserved = (stat.f_blocks as f64 * *config.reserved_disk_ratio) as u64; + + (Some(path), reserved) + } + }; + + GlobalInstance::set(Arc::new(Self { + root, + global_limit: config.global_bytes_limit as usize, + reserved, + group: Mutex::new(Group { + dirs: HashMap::new(), + }), + })); + Ok(()) + } + + pub fn instance() -> Arc { + GlobalInstance::get() + } + + pub fn get_disk_spill_dir( + self: &Arc, + limit: usize, + query_id: &str, + ) -> Option> { + if limit == 0 { + return None; + } + + let path = self.root.as_ref()?.join(query_id).into_boxed_path(); + let mut group = self.group.lock().unwrap(); + let dir = match group.dirs.entry(path.clone()) { + Entry::Occupied(o) => TempDir { + path, + dir_info: o.get().clone(), + manager: self.clone(), + }, + Entry::Vacant(v) => { + let dir_info = Arc::new(DirInfo { + limit, + count: Default::default(), + size: Default::default(), + inited: Once::new(), + }); + v.insert(dir_info.clone()); + TempDir { + path, + dir_info, + manager: self.clone(), + } + } + }; + Some(Arc::new(dir)) + } + + pub fn drop_disk_spill_dir(self: &Arc, query_id: &str) -> Result { + let path = match self.root.as_ref() { + None => return Ok(false), + Some(root) => root.join(query_id).into_boxed_path(), + }; + + let mut group = self.group.lock().unwrap(); + if group.dirs.remove(&path).is_some() { + match remove_dir_all(&path) { + Ok(_) => return Ok(true), + Err(e) if matches!(e.kind(), ErrorKind::NotFound) => {} + res => res?, + } + } + Ok(false) + } + + pub fn drop_disk_spill_dir_unknown( + self: &Arc, + limit: usize, + ) -> Result>> { + match self.root.as_ref() { + None => Ok(vec![]), + Some(root) => { + let read_dir = std::fs::read_dir(root)?; + let group = self.group.lock().unwrap(); + let to_delete = read_dir + .filter_map(|entry| match entry { + Err(_) => None, + Ok(entry) => { + let path = entry.path().into_boxed_path(); + if group.dirs.contains_key(&path) { + None + } else { + Some(path) + } + } + }) + .take(limit) + .collect::>(); + drop(group); + for path in &to_delete { + remove_dir_all(path)?; + } + Ok(to_delete) + } + } + } + + fn insufficient_disk(&self, size: u64) -> Result { + let stat = statvfs(self.root.as_ref().unwrap().as_ref()) + .map_err(|e| ErrorCode::Internal(e.to_string()))?; + Ok(stat.f_bavail < self.reserved + (size + stat.f_frsize - 1) / stat.f_frsize) + } +} + +struct Group { + dirs: HashMap, Arc>, +} + +impl Group { + fn size(&self) -> usize { + self.dirs.values().map(|v| *v.size.lock().unwrap()).sum() + } +} + +#[derive(Clone)] +pub struct TempDir { + path: Box, + dir_info: Arc, + manager: Arc, +} + +impl TempDir { + pub fn new_file_with_size(&self, size: usize) -> Result> { + let path = self.path.join(GlobalUniqName::unique()).into_boxed_path(); + + if self.dir_info.limit < *self.dir_info.size.lock().unwrap() + size + || self.manager.global_limit < self.manager.group.lock().unwrap().size() + size + || self.manager.insufficient_disk(size as u64)? + { + return Ok(None); + } + + let mut dir_size = self.dir_info.size.lock().unwrap(); + if self.dir_info.limit < *dir_size + size { + return Ok(None); + } + + *dir_size += size; + drop(dir_size); + + self.init_dir()?; + + let dir_info = self.dir_info.clone(); + dir_info.count.fetch_add(1, Ordering::SeqCst); + + Ok(Some(TempPath(Arc::new(InnerPath { + path, + size, + dir_info, + })))) + } + + fn init_dir(&self) -> Result<()> { + let mut rt = Ok(()); + self.dir_info.inited.call_once(|| { + if let Err(e) = create_dir(&self.path) { + if !matches!(e.kind(), ErrorKind::AlreadyExists) { + rt = Err(e); + } + } + }); + Ok(rt?) + } +} + +struct DirInfo { + limit: usize, + count: AtomicUsize, + size: Mutex, + inited: Once, +} + +impl Debug for DirInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DirInfo") + .field("limit", &self.limit) + .field("count", &self.count) + .field("size", &self.size) + .field("inited", &self.inited.is_completed()) + .finish() + } +} + +#[derive(Clone)] +pub struct TempPath(Arc); + +impl Debug for TempPath { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TempPath") + .field("path", &self.0.path) + .field("size", &self.0.size) + .field("dir_info", &self.0.dir_info) + .finish() + } +} + +impl Hash for TempPath { + fn hash(&self, state: &mut H) { + self.0.path.hash(state); + } +} + +impl PartialEq for TempPath { + fn eq(&self, other: &Self) -> bool { + self.0.path == other.0.path + } +} + +impl Eq for TempPath {} + +impl AsRef for TempPath { + fn as_ref(&self) -> &Path { + self.0.path.as_ref() + } +} + +impl Deref for TempPath { + type Target = Path; + + fn deref(&self) -> &Path { + self.as_ref() + } +} + +impl TempPath { + pub fn size(&self) -> usize { + self.0.size + } +} + +struct InnerPath { + path: Box, + size: usize, + dir_info: Arc, +} + +impl Drop for InnerPath { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.path); + + self.dir_info.count.fetch_sub(1, Ordering::SeqCst); + let mut guard = self.dir_info.size.lock().unwrap(); + *guard -= self.size; + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::ffi::OsString; + use std::fs; + use std::sync::atomic::Ordering; + + use super::*; + + #[test] + fn test_temp_dir() -> Result<()> { + let thread = std::thread::current(); + GlobalInstance::init_testing(thread.name().unwrap()); + + let config = SpillConfig { + path: OsString::from("test_data"), + reserved_disk_ratio: 0.01.into(), + global_bytes_limit: 1 << 30, + }; + + TempDirManager::init(&config, "test_tenant")?; + + let mgr = TempDirManager::instance(); + let dir = mgr.get_disk_spill_dir(1 << 30, "some_query").unwrap(); + let path = dir.new_file_with_size(100)?.unwrap(); + + println!("{:?}", &path); + + fs::write(&path, vec![b'a'; 100])?; + + assert_eq!(1, dir.dir_info.count.load(Ordering::Relaxed)); + assert_eq!(100, *dir.dir_info.size.lock().unwrap()); + + let path_str = path.as_ref().to_str().unwrap().to_string(); + drop(path); + + assert_eq!(0, dir.dir_info.count.load(Ordering::Relaxed)); + assert_eq!(0, *dir.dir_info.size.lock().unwrap()); + + assert_matches!(fs::read_to_string(path_str), Err(_)); + + mgr.drop_disk_spill_dir("some_query")?; + + remove_dir_all("test_data")?; + + Ok(()) + } + + #[test] + fn test_drop_disk_spill_dir_unknown() -> Result<()> { + let thread = std::thread::current(); + GlobalInstance::init_testing(thread.name().unwrap()); + + let config = SpillConfig { + path: OsString::from("test_data2"), + reserved_disk_ratio: 0.99.into(), + global_bytes_limit: 1 << 30, + }; + + TempDirManager::init(&config, "test_tenant")?; + + let mgr = TempDirManager::instance(); + mgr.get_disk_spill_dir(1 << 30, "some_query").unwrap(); + + create_dir("test_data2/test_tenant/unknown_query1")?; + create_dir("test_data2/test_tenant/unknown_query2")?; + + let mut deleted = mgr.drop_disk_spill_dir_unknown(10)?; + + deleted.sort(); + + assert_eq!( + vec![ + PathBuf::from("test_data2/test_tenant/unknown_query1").into_boxed_path(), + PathBuf::from("test_data2/test_tenant/unknown_query2").into_boxed_path(), + ], + deleted + ); + + remove_dir_all("test_data2")?; + + Ok(()) + } +} diff --git a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test index 2f268148905e..fccbd097486a 100644 --- a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test +++ b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test @@ -7,6 +7,9 @@ USE test_window_partition_spill statement ok set window_partition_spilling_bytes_threshold_per_proc = 1024 * 1024 * 1; +statement ok +set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; + query T SELECT SUM(number + a + b) FROM ( diff --git a/tests/sqllogictests/suites/tpcds/spill.test b/tests/sqllogictests/suites/tpcds/spill.test index 9446d0209a66..366c85121b6d 100644 --- a/tests/sqllogictests/suites/tpcds/spill.test +++ b/tests/sqllogictests/suites/tpcds/spill.test @@ -23,6 +23,9 @@ set sort_spilling_bytes_threshold_per_proc = 1; statement ok set window_partition_spilling_memory_ratio = 1; +statement ok +set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; + statement ok set window_partition_spilling_bytes_threshold_per_proc = 1; diff --git a/tests/sqllogictests/suites/tpch/spill.test b/tests/sqllogictests/suites/tpch/spill.test index 757154bbe84b..c393f2082b61 100644 --- a/tests/sqllogictests/suites/tpch/spill.test +++ b/tests/sqllogictests/suites/tpch/spill.test @@ -23,6 +23,9 @@ set sort_spilling_bytes_threshold_per_proc = 1; statement ok set window_partition_spilling_memory_ratio = 1; +statement ok +set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; + statement ok set window_partition_spilling_bytes_threshold_per_proc = 1; From 25c2e4d4a867818fbd9d20a740acda04c81131e4 Mon Sep 17 00:00:00 2001 From: coldWater Date: Wed, 9 Oct 2024 09:27:50 +0800 Subject: [PATCH 2/3] feat(query): Simplified version Pattern S/L/D/G for number `to_char` (#16569) * f32 basic locale Signed-off-by: coldWater * fix Signed-off-by: coldWater * FmtCacheEntry Signed-off-by: coldWater * fix Signed-off-by: coldWater * fix Signed-off-by: coldWater * fix Signed-off-by: coldWater --------- Signed-off-by: coldWater --- src/common/io/src/number.rs | 206 +++++++++++++----- src/query/functions/src/scalars/other.rs | 48 +++- .../it/scalars/testdata/function_list.txt | 6 +- .../functions/02_0078_function_to_char.test | 48 ++-- 4 files changed, 225 insertions(+), 83 deletions(-) diff --git a/src/common/io/src/number.rs b/src/common/io/src/number.rs index 4806a3a2acc7..f4bb7d7aa717 100644 --- a/src/common/io/src/number.rs +++ b/src/common/io/src/number.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; + use databend_common_exception::ErrorCode; use databend_common_exception::Result; use enumflags2::bitflags; @@ -259,6 +261,9 @@ impl NumDesc { if self.flag.contains(NumFlag::LSign) { return Err("cannot use \"S\" twice"); } + self.need_locale = true; + self.flag.insert(NumFlag::LSign); + if self .flag .intersects(NumFlag::Plus | NumFlag::Minus | NumFlag::Bracket) @@ -266,19 +271,13 @@ impl NumDesc { return Err("cannot use \"S\" and \"PL\"/\"MI\"/\"SG\"/\"PR\" together"); } - if self.flag.contains(NumFlag::Decimal) { + if !self.flag.contains(NumFlag::Decimal) { self.lsign = Some(NumLSign::Pre); self.pre_lsign_num = self.pre; - self.need_locale = true; - self.flag.insert(NumFlag::LSign); - return Ok(()); - } - - if self.lsign.is_none() { + } else { self.lsign = Some(NumLSign::Post); - self.need_locale = true; - self.flag.insert(NumFlag::LSign); } + Ok(()) } @@ -430,6 +429,14 @@ impl NumDesc { } fn f64_to_num_part(&mut self, value: f64) -> Result { + self.float_to_num_part(value, 15) + } + + fn f32_to_num_part(&mut self, value: f32) -> Result { + self.float_to_num_part(value as f64, 6) + } + + fn float_to_num_part(&mut self, value: f64, float_digits: usize) -> Result { if self.flag.contains(NumFlag::Roman) { return Err(ErrorCode::Unimplemented("to_char RN (Roman numeral)")); } @@ -463,12 +470,11 @@ impl NumDesc { let orgnum = format!("{:.0}", value.abs()); let numstr_pre_len = orgnum.len(); - const FLT_DIG: usize = 6; // adjust post digits to fit max float digits - if numstr_pre_len >= FLT_DIG { + if numstr_pre_len >= float_digits { self.post = 0; - } else if numstr_pre_len + self.post > FLT_DIG { - self.post = FLT_DIG - numstr_pre_len; + } else if numstr_pre_len + self.post > float_digits { + self.post = float_digits - numstr_pre_len; } let orgnum = format!("{:.*}", self.post, value.abs()); @@ -570,10 +576,6 @@ struct NumProc { num_curr: usize, // current position in number out_pre_spaces: usize, // spaces before first digit - _read_dec: bool, // to_number - was read dec. point - _read_post: usize, // to_number - number of dec. digit - _read_pre: usize, // to_number - number non-dec. digit - number: Vec, number_p: usize, @@ -584,8 +586,8 @@ struct NumProc { decimal: String, loc_negative_sign: String, loc_positive_sign: String, - _loc_thousands_sep: String, - _loc_currency_symbol: String, + loc_thousands_sep: String, + loc_currency_symbol: String, } impl NumProc { @@ -728,6 +730,58 @@ impl NumProc { fn last_relevant_is_dot(&self) -> bool { self.last_relevant.is_some_and(|(c, _)| c == '.') } + + fn prepare_locale(&mut self) { + // todo: True localization will be the next step + self.loc_negative_sign = "-".to_string(); + self.loc_positive_sign = "+".to_string(); + self.loc_thousands_sep = ",".to_string(); + self.loc_currency_symbol = "$".to_string(); + } +} + +pub struct FmtCacheEntry { + format: Vec, + _str: String, + desc: NumDesc, +} + +impl FromStr for FmtCacheEntry { + type Err = ErrorCode; + + fn from_str(s: &str) -> std::result::Result { + let mut desc = NumDesc::default(); + let format = parse_format(s, &NUM_KEYWORDS, Some(&mut desc))?; + Ok(FmtCacheEntry { + format, + _str: s.to_string(), + desc, + }) + } +} + +impl FmtCacheEntry { + pub fn process_i64(&self, value: i64) -> Result { + let desc = self.desc.clone(); + let num_part = desc.i64_to_num_part(value)?; + self.process(desc, num_part) + } + + pub fn process_f64(&self, value: f64) -> Result { + let mut desc = self.desc.clone(); + let num_part = desc.f64_to_num_part(value)?; + self.process(desc, num_part) + } + + pub fn process_f32(&self, value: f32) -> Result { + let mut desc = self.desc.clone(); + let num_part = desc.f32_to_num_part(value)?; + self.process(desc, num_part) + } + + fn process(&self, desc: NumDesc, num_part: NumPart) -> Result { + num_processor(&self.format, desc, num_part) + } } fn num_processor(nodes: &[FormatNode], desc: NumDesc, num_part: NumPart) -> Result { @@ -744,9 +798,6 @@ fn num_processor(nodes: &[FormatNode], desc: NumDesc, num_part: NumPart) -> Resu num_in: false, num_curr: 0, out_pre_spaces, - _read_dec: false, - _read_post: 0, - _read_pre: 0, number: number.chars().collect(), number_p: 0, inout: String::new(), @@ -754,8 +805,8 @@ fn num_processor(nodes: &[FormatNode], desc: NumDesc, num_part: NumPart) -> Resu decimal: ".".to_string(), loc_negative_sign: String::new(), loc_positive_sign: String::new(), - _loc_thousands_sep: String::new(), - _loc_currency_symbol: String::new(), + loc_thousands_sep: String::new(), + loc_currency_symbol: String::new(), }; if np.desc.zero_start > 0 { @@ -829,8 +880,7 @@ fn num_processor(nodes: &[FormatNode], desc: NumDesc, num_part: NumPart) -> Resu // Locale if np.desc.need_locale { - // NUM_prepare_locale(Np); - return Err(ErrorCode::Unimplemented("to_char uses locale S/L/D/G")); + np.prepare_locale(); } // Processor direct cycle @@ -838,6 +888,10 @@ fn num_processor(nodes: &[FormatNode], desc: NumDesc, num_part: NumPart) -> Resu match n { // Format pictures actions FormatNode::Action(key) => match key.id { + // Note: The locale sign is anchored to number and we + // write it when we work with first or last number + // (Tk0/Tk9). + NumPoz::TkS => (), id @ (NumPoz::Tk9 | NumPoz::Tk0 | NumPoz::TkDec | NumPoz::TkD) => { np.numpart_to_char(id) } @@ -852,6 +906,23 @@ fn num_processor(nodes: &[FormatNode], desc: NumDesc, num_part: NumPart) -> Resu } } + NumPoz::TkG => { + if np.num_in { + np.inout.push_str(&np.loc_thousands_sep); + continue; + } + if np.desc.flag.contains(NumFlag::FillMode) { + continue; + } + let sep = " ".repeat(np.loc_thousands_sep.len()); + np.inout.push_str(&sep); + continue; + } + + NumPoz::TkL => { + np.inout.push_str(&np.loc_currency_symbol); + } + NumPoz::TkMI => { if np.sign { if !np.desc.flag.contains(NumFlag::FillMode) { @@ -890,30 +961,22 @@ fn num_processor(nodes: &[FormatNode], desc: NumDesc, num_part: NumPart) -> Resu Ok(np.inout) } -pub fn i64_to_char(value: i64, fmt: &str) -> Result { - // TODO: We should cache FormatNode - let mut desc = NumDesc::default(); - let nodes = parse_format(fmt, &NUM_KEYWORDS, Some(&mut desc))?; - - let num_part = desc.i64_to_num_part(value)?; - - num_processor(&nodes, desc, num_part) -} - -pub fn f64_to_char(value: f64, fmt: &str) -> Result { - // TODO: We should cache FormatNode - let mut desc = NumDesc::default(); - let nodes = parse_format(fmt, &NUM_KEYWORDS, Some(&mut desc))?; - - let num_part = desc.f64_to_num_part(value)?; - - num_processor(&nodes, desc, num_part) -} - #[cfg(test)] mod tests { use super::*; + fn i64_to_char(value: i64, fmt: &str) -> Result { + fmt.parse::()?.process_i64(value) + } + + fn f64_to_char(value: f64, fmt: &str) -> Result { + fmt.parse::()?.process_f64(value) + } + + fn f32_to_char(value: f32, fmt: &str) -> Result { + fmt.parse::()?.process_f32(value) + } + #[test] fn test_i64() -> Result<()> { assert_eq!(" 123", i64_to_char(123, "999")?); @@ -986,13 +1049,32 @@ mod tests { assert_eq!("485 ", i64_to_char(485, "999MI")?); assert_eq!("485", i64_to_char(485, "FM999MI")?); - // assert_eq!(" 1 485", i64_to_char(1485, "9G999")?); + assert_eq!(" 1,485", i64_to_char(1485, "9G999")?); + assert_eq!("-1,485", i64_to_char(-1485, "9G999")?); + + assert_eq!("1,485", i64_to_char(1485, "FM9G999")?); + assert_eq!("-1,485", i64_to_char(-1485, "FM9G999")?); + + assert_eq!(" 12,345,678", i64_to_char(12345678, "999G999G999")?); + assert_eq!(" -12,345,678", i64_to_char(-12345678, "999G999G999")?); + + assert_eq!("12,345,678", i64_to_char(12345678, "FM999G999G999")?); + assert_eq!("-12,345,678", i64_to_char(-12345678, "FM999G999G999")?); + + assert_eq!("$ 485", i64_to_char(485, "L999")?); + assert_eq!("$-485", i64_to_char(-485, "L999")?); + + assert_eq!("485+", i64_to_char(485, "999S")?); + assert_eq!("485-", i64_to_char(-485, "999S")?); + + assert_eq!("+485", i64_to_char(485, "S999")?); + assert_eq!("-485", i64_to_char(-485, "S999")?); Ok(()) } #[test] - fn test_f64() -> Result<()> { + fn test_float() -> Result<()> { assert_eq!(" 12.34", f64_to_char(12.34, "99.99")?); assert_eq!("-12.34", f64_to_char(-12.34, "99.99")?); assert_eq!(" .10", f64_to_char(0.1, "99.99")?); @@ -1014,11 +1096,31 @@ mod tests { f64_to_char(485.8, "\"Pre:\"999\" Post:\" .999")? ); - // assert_eq!(" 148,500", f64_to_char(148.5, "999D999")?); - // assert_eq!(" 3 148,500", f64_to_char(3148.5, "9G999D999")?); - // assert_eq!("485-", f64_to_char(-485, "999S")?); + assert_eq!(" 148.500", f64_to_char(148.5, "999D999")?); - // assert_eq!("DM 485", f64_to_char(485, "L999")?); + assert_eq!(" 0003148.50", f32_to_char(3148.5, "0009999.999")?); + assert_eq!(" 3,148.50", f32_to_char(3148.5, "9G999D999")?); + + assert_eq!( + " 1234567040", + f32_to_char(1.234_567e9, "99999999999D999999")? + ); + assert_eq!( + " 1234567", + f32_to_char(1234567.0, "99999999999D999999")? + ); + assert_eq!( + " 1234.57", + f32_to_char(1.234_567e3, "99999999999D999999")? + ); + assert_eq!( + " 1.23457", + f32_to_char(1.234_567, "99999999999D999999")? + ); + assert_eq!( + " .00123", + f32_to_char(1.234_567e-3, "99999999999D999999")? + ); // assert_eq!(" CDLXXXV", f64_to_char(485, "RN")?); // assert_eq!("CDLXXXV", f64_to_char(485, "FMRN")?); diff --git a/src/query/functions/src/scalars/other.rs b/src/query/functions/src/scalars/other.rs index 9d1c98b934e7..2ed8f5eeb456 100644 --- a/src/query/functions/src/scalars/other.rs +++ b/src/query/functions/src/scalars/other.rs @@ -24,6 +24,7 @@ use databend_common_base::base::OrderedFloat; use databend_common_expression::error_to_null; use databend_common_expression::types::boolean::BooleanDomain; use databend_common_expression::types::nullable::NullableColumn; +use databend_common_expression::types::number::Float32Type; use databend_common_expression::types::number::Float64Type; use databend_common_expression::types::number::Int64Type; use databend_common_expression::types::number::UInt32Type; @@ -59,8 +60,7 @@ use databend_common_expression::Scalar; use databend_common_expression::ScalarRef; use databend_common_expression::Value; use databend_common_expression::ValueRef; -use databend_common_io::number::f64_to_char; -use databend_common_io::number::i64_to_char; +use databend_common_io::number::FmtCacheEntry; use rand::Rng; use rand::SeedableRng; @@ -401,7 +401,41 @@ fn register_num_to_char(registry: &mut FunctionRegistry) { } } - match i64_to_char(value, fmt) { + // TODO: We should cache FmtCacheEntry + match fmt + .parse::() + .and_then(|entry| entry.process_i64(value)) + { + Ok(s) => { + builder.put_str(&s); + builder.commit_row() + } + Err(e) => { + ctx.set_error(builder.len(), e.to_string()); + builder.commit_row() + } + } + }, + ), + ); + + registry.register_passthrough_nullable_2_arg::( + "to_char", + |_, _, _| FunctionDomain::MayThrow, + vectorize_with_builder_2_arg::( + |value, fmt, builder, ctx| { + if let Some(validity) = &ctx.validity { + if !validity.get_bit(builder.len()) { + builder.commit_row(); + return; + } + } + + // TODO: We should cache FmtCacheEntry + match fmt + .parse::() + .and_then(|entry| entry.process_f32(*value)) + { Ok(s) => { builder.put_str(&s); builder.commit_row() @@ -427,7 +461,11 @@ fn register_num_to_char(registry: &mut FunctionRegistry) { } } - match f64_to_char(*value, fmt) { + // TODO: We should cache FmtCacheEntry + match fmt + .parse::() + .and_then(|entry| entry.process_f64(*value)) + { Ok(s) => { builder.put_str(&s); builder.commit_row() @@ -439,7 +477,7 @@ fn register_num_to_char(registry: &mut FunctionRegistry) { } }, ), - ) + ); } /// Compute `grouping` by `grouping_id` and `cols`. diff --git a/src/query/functions/tests/it/scalars/testdata/function_list.txt b/src/query/functions/tests/it/scalars/testdata/function_list.txt index 7dc2aa898d68..910c84e86b92 100644 --- a/src/query/functions/tests/it/scalars/testdata/function_list.txt +++ b/src/query/functions/tests/it/scalars/testdata/function_list.txt @@ -3736,8 +3736,10 @@ Functions overloads: 23 to_boolean(Float64 NULL) :: Boolean NULL 0 to_char(Int64, String) :: String 1 to_char(Int64 NULL, String NULL) :: String NULL -2 to_char(Float64, String) :: String -3 to_char(Float64 NULL, String NULL) :: String NULL +2 to_char(Float32, String) :: String +3 to_char(Float32 NULL, String NULL) :: String NULL +4 to_char(Float64, String) :: String +5 to_char(Float64 NULL, String NULL) :: String NULL 0 to_date(Variant) :: Date 1 to_date(Variant NULL) :: Date NULL 2 to_date(String, String) :: Date NULL diff --git a/tests/sqllogictests/suites/query/functions/02_0078_function_to_char.test b/tests/sqllogictests/suites/query/functions/02_0078_function_to_char.test index b73f95316b44..666af4d0fbfb 100644 --- a/tests/sqllogictests/suites/query/functions/02_0078_function_to_char.test +++ b/tests/sqllogictests/suites/query/functions/02_0078_function_to_char.test @@ -11,14 +11,14 @@ INSERT INTO INT64_TBL VALUES (+4567890123456789,'4567890123456789'), ('+4567890123456789','-4567890123456789'); -# query T -# SELECT to_char(q1, '9G999G999G999G999G999'), to_char(q2, '9,999,999,999,999,999') FROM INT64_TBL; -#------------------------+------------------------ -# 123 | 456 -# 123 | 4,567,890,123,456,789 -# 4,567,890,123,456,789 | 123 -# 4,567,890,123,456,789 | 4,567,890,123,456,789 -# 4,567,890,123,456,789 | -4,567,890,123,456,789 +query T +SELECT to_char(q1, '9G999G999G999G999G999'), to_char(q2, '9,999,999,999,999,999') FROM INT64_TBL; +---- + 123 456 + 123 4,567,890,123,456,789 + 4,567,890,123,456,789 123 + 4,567,890,123,456,789 4,567,890,123,456,789 + 4,567,890,123,456,789 -4,567,890,123,456,789 # SELECT to_char(q1, '9G999G999G999G999G999D999G999'), to_char(q2, '9,999,999,999,999,999.999,999') FROM INT64_TBL; query T @@ -39,14 +39,14 @@ SELECT to_char( (q1 * -1), '9999999999999999PR'), to_char( (q2 * -1), '999999999 <4567890123456789> <4567890123456789.000> <4567890123456789> 4567890123456789.000 -# query T -# SELECT to_char( (q1 * -1), '9999999999999999S'), to_char( (q2 * -1), 'S9999999999999999') FROM INT64_TBL; -#-------------------+------------------- -# 123- | -456 -# 123- | -4567890123456789 -# 4567890123456789- | -123 -# 4567890123456789- | -4567890123456789 -# 4567890123456789- | +4567890123456789 +query T +SELECT to_char( (q1 * -1), '9999999999999999S'), to_char( (q2 * -1), 'S9999999999999999') FROM INT64_TBL; +---- + 123- -456 + 123- -4567890123456789 + 4567890123456789- -123 + 4567890123456789- -4567890123456789 + 4567890123456789- +4567890123456789 query T SELECT to_char(q2, 'MI9999999999999999') FROM INT64_TBL; @@ -94,14 +94,14 @@ SELECT to_char(q2, '0999999999999999') FROM INT64_TBL; 4567890123456789 -4567890123456789 -# query T -# SELECT to_char(q2, 'S0999999999999999') FROM INT64_TBL; -#------------------- -# +0000000000000456 -# +4567890123456789 -# +0000000000000123 -# +4567890123456789 -# -4567890123456789 +query T +SELECT to_char(q2, 'S0999999999999999') FROM INT64_TBL; +---- + +0000000000000456 + +4567890123456789 + +0000000000000123 + +4567890123456789 + -4567890123456789 query T SELECT to_char(q2, 'FM0999999999999999') FROM INT64_TBL; From 0c550c5f52fa23ae25783b6900730dadddb41686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 9 Oct 2024 10:54:13 +0800 Subject: [PATCH 3/3] refactor: Automatically chunk large key sets in KVPbApi methods (#16561) - Modify KVPbApi::get_pb_vec() and KVPbApi::get_pb_values_vec() to automatically split large key sets into chunks of 256 keys - Simplifies caller logic by eliminating the need for manual key set splitting --- Cargo.lock | 1 + src/meta/api/Cargo.toml | 1 + src/meta/api/src/kv_pb_api/mod.rs | 111 +++++++++++++++++++++++++--- src/meta/api/src/schema_api.rs | 7 +- src/meta/api/src/schema_api_impl.rs | 105 ++++++++++++-------------- 5 files changed, 152 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 496eb42ab184..dfd8c5cd904f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3718,6 +3718,7 @@ dependencies = [ "databend-common-proto-conv", "fastrace 0.7.2", "futures", + "itertools 0.10.5", "log", "logcall", "maplit", diff --git a/src/meta/api/Cargo.toml b/src/meta/api/Cargo.toml index ebaa2b0a68e8..b3e1231e4104 100644 --- a/src/meta/api/Cargo.toml +++ b/src/meta/api/Cargo.toml @@ -27,6 +27,7 @@ databend-common-meta-types = { workspace = true } databend-common-proto-conv = { workspace = true } fastrace = { workspace = true } futures = { workspace = true } +itertools = { workspace = true } log = { workspace = true } logcall = { workspace = true } maplit = "1.0.2" diff --git a/src/meta/api/src/kv_pb_api/mod.rs b/src/meta/api/src/kv_pb_api/mod.rs index fc1caae81c9d..67b01bea97fb 100644 --- a/src/meta/api/src/kv_pb_api/mod.rs +++ b/src/meta/api/src/kv_pb_api/mod.rs @@ -40,6 +40,7 @@ use futures::stream; use futures::stream::BoxStream; use futures::stream::StreamExt; use futures::TryStreamExt; +use itertools::Itertools; pub(crate) use self::codec::decode_non_empty_item; pub(crate) use self::codec::decode_seqv; @@ -52,6 +53,9 @@ use crate::kv_pb_api::errors::StreamReadEof; /// This trait provides a way to access a kv store with `kvapi::Key` type key and protobuf encoded value. pub trait KVPbApi: KVApi { + /// The number of keys in one batch get. + const CHUNK_SIZE: usize = 256; + /// Update or insert a protobuf encoded value by kvapi::Key. /// /// The key will be converted to string and the value is encoded by `FromToProto`. @@ -189,21 +193,35 @@ pub trait KVPbApi: KVApi { } /// Same as [`get_pb_values`](Self::get_pb_values) but collect the result in a `Vec` instead of a stream. + /// + /// If the number of keys is larger than [`Self::CHUNK_SIZE`], it will be split into multiple requests. fn get_pb_values_vec( &self, keys: I, ) -> impl Future>>, Self::Error>> + Send where - K: kvapi::Key + 'static, + K: kvapi::Key + Send + 'static, K::ValueType: FromToProto + Send + 'static, I: IntoIterator + Send, + I::IntoIter: Send, Self::Error: From>, { + let it = keys.into_iter(); + let key_chunks = it + .chunks(Self::CHUNK_SIZE) + .into_iter() + .map(|x| x.collect::>()) + .collect::>(); + async move { - self.get_pb_values(keys) - .await? - .try_collect::>() - .await + let mut res = vec![]; + for chunk in key_chunks { + let strm = self.get_pb_values(chunk).await?; + + let vec = strm.try_collect::>().await?; + res.extend(vec); + } + Ok(res) } } @@ -241,6 +259,8 @@ pub trait KVPbApi: KVApi { } /// Same as [`get_pb_stream`](Self::get_pb_stream) but collect the result in a `Vec` instead of a stream. + /// + /// If the number of keys is larger than [`Self::CHUNK_SIZE`], it will be split into multiple requests. fn get_pb_vec( &self, keys: I, @@ -249,11 +269,25 @@ pub trait KVPbApi: KVApi { K: kvapi::Key + Send + 'static, K::ValueType: FromToProto + Send + 'static, I: IntoIterator + Send, + I::IntoIter: Send, Self::Error: From>, { + let it = keys.into_iter(); + let key_chunks = it + .chunks(Self::CHUNK_SIZE) + .into_iter() + .map(|x| x.collect::>()) + .collect::>(); + async move { - let kvs = self.get_pb_stream(keys).await?.try_collect().await?; - Ok(kvs) + let mut res = vec![]; + for chunk in key_chunks { + let strm = self.get_pb_stream(chunk).await?; + + let vec = strm.try_collect::>().await?; + res.extend(vec); + } + Ok(res) } } @@ -497,14 +531,14 @@ mod tests { use crate::kv_pb_api::KVPbApi; // - struct Foo { + struct FooKV { /// Whether to return without exhausting the input for `get_kv_stream`. early_return: Option, kvs: BTreeMap, } #[async_trait] - impl KVApi for Foo { + impl KVApi for FooKV { type Error = MetaError; async fn upsert_kv(&self, _req: UpsertKVReq) -> Result { @@ -560,7 +594,7 @@ mod tests { }; let v = catalog_meta.to_pb()?.encode_to_vec(); - let foo = Foo { + let foo = FooKV { early_return: Some(2), kvs: vec![ (s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())), @@ -613,7 +647,7 @@ mod tests { }; let v = catalog_meta.to_pb()?.encode_to_vec(); - let foo = Foo { + let foo = FooKV { early_return: None, kvs: vec![ (s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())), @@ -675,6 +709,61 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_get_pb_vec_span_chunk() -> anyhow::Result<()> { + let catalog_meta = CatalogMeta { + catalog_option: CatalogOption::Hive(HiveCatalogOption { + address: "127.0.0.1:10000".to_string(), + storage_params: None, + }), + created_on: DateTime::::MIN_UTC, + }; + let catalog_bytes = catalog_meta.to_pb()?.encode_to_vec(); + + let n = 1024; + let mut kvs = vec![]; + for i in 1..=n { + let key = s(format!("__fd_catalog_by_id/{}", i)); + let value = SeqV::new(i, catalog_bytes.clone()); + kvs.push((key, value)); + } + + let foo = FooKV { + early_return: None, + kvs: kvs.into_iter().collect(), + }; + + assert!(FooKV::CHUNK_SIZE < n as usize); + + let tenant = Tenant::new_literal("dummy"); + + { + let got = foo + .get_pb_vec((1..=n).map(|i| CatalogIdIdent::new(&tenant, i))) + .await?; + + for i in 1..=n { + let key = CatalogIdIdent::new(&tenant, i); + assert_eq!(key, got[i as usize - 1].0.clone()); + let value = got[i as usize - 1].1.clone().unwrap(); + assert_eq!(i, value.seq()); + } + } + + { + let got = foo + .get_pb_values_vec((1..=n).map(|i| CatalogIdIdent::new(&tenant, i))) + .await?; + + for i in 1..=n { + let value = got[i as usize - 1].clone().unwrap(); + assert_eq!(i, value.seq()); + } + } + + Ok(()) + } + fn s(x: impl ToString) -> String { x.to_string() } diff --git a/src/meta/api/src/schema_api.rs b/src/meta/api/src/schema_api.rs index 679f2aedd7e8..40dc98419346 100644 --- a/src/meta/api/src/schema_api.rs +++ b/src/meta/api/src/schema_api.rs @@ -124,14 +124,15 @@ pub trait SchemaApi: Send + Sync { async fn list_databases( &self, req: ListDatabaseReq, - ) -> Result>, KVAppError>; + ) -> Result>, MetaError>; async fn rename_database( &self, req: RenameDatabaseReq, ) -> Result; - /// Retrieves all databases for a specific tenant, including those marked as dropped. + /// Retrieves all databases for a specific tenant, + /// optionally including those marked as dropped. /// /// * `include_non_retainable` - /// If true, includes databases that are beyond the retention period. @@ -140,7 +141,7 @@ pub trait SchemaApi: Send + Sync { &self, req: ListDatabaseReq, include_non_retainable: bool, - ) -> Result>, KVAppError>; + ) -> Result>, MetaError>; // index diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 7ec7a8ade59a..653ba10c9135 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -638,7 +638,7 @@ impl + ?Sized> SchemaApi for KV { &self, req: ListDatabaseReq, include_non_retainable: bool, - ) -> Result>, KVAppError> { + ) -> Result>, MetaError> { debug!(req :? =(&req); "SchemaApi: {}", func_name!()); let name_ident = DatabaseIdHistoryIdent::new(&req.tenant, "dummy"); @@ -652,26 +652,23 @@ impl + ?Sized> SchemaApi for KV { let ids = db_id_list .id_list .iter() - .map(|db_id| DatabaseId { db_id: *db_id }) - .collect::>(); + .map(|db_id| DatabaseId { db_id: *db_id }); - for db_ids in ids.chunks(DEFAULT_MGET_SIZE) { - let id_metas = self.get_pb_vec(db_ids.iter().cloned()).await?; + let id_metas = self.get_pb_vec(ids).await?; - for (db_id, db_meta) in id_metas { - let Some(db_meta) = db_meta else { - error!("get_database_history cannot find {:?} db_meta", db_id); - continue; - }; + for (db_id, db_meta) in id_metas { + let Some(db_meta) = db_meta else { + error!("get_database_history cannot find {:?} db_meta", db_id); + continue; + }; - let db = DatabaseInfo { - database_id: db_id, - name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()), - meta: db_meta, - }; + let db = DatabaseInfo { + database_id: db_id, + name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()), + meta: db_meta, + }; - dbs.insert(db_id.db_id, Arc::new(db)); - } + dbs.insert(db_id.db_id, Arc::new(db)); } } @@ -706,7 +703,7 @@ impl + ?Sized> SchemaApi for KV { async fn list_databases( &self, req: ListDatabaseReq, - ) -> Result>, KVAppError> { + ) -> Result>, MetaError> { debug!(req :? =(&req); "SchemaApi: {}", func_name!()); let name_key = DatabaseNameIdent::new(req.tenant(), "dummy"); @@ -1659,11 +1656,7 @@ impl + ?Sized> SchemaApi for KV { .map(|id| TableId { table_id: id }) .collect::>(); - let mut seq_metas = vec![]; - for chunk in ids.chunks(DEFAULT_MGET_SIZE) { - let got = self.get_pb_values_vec(chunk.to_vec()).await?; - seq_metas.extend(got); - } + let seq_metas = self.get_pb_values_vec(ids.clone()).await?; let res = names .into_iter() @@ -3091,27 +3084,22 @@ async fn get_table_meta_history( ) -> Result)>, KVAppError> { let mut tb_metas = vec![]; - let inner_keys = tb_id_list - .id_list - .into_iter() - .map(TableId::new) - .collect::>(); + let inner_keys = tb_id_list.id_list.into_iter().map(TableId::new); - for c in inner_keys.chunks(DEFAULT_MGET_SIZE) { - let kvs = kv_api.get_pb_vec(c.iter().cloned()).await?; + let kvs = kv_api.get_pb_vec(inner_keys).await?; - for (k, table_meta) in kvs { - let Some(table_meta) = table_meta else { - error!("get_table_history cannot find {:?} table_meta", k); - continue; - }; + for (k, table_meta) in kvs { + let Some(table_meta) = table_meta else { + error!("get_table_history cannot find {:?} table_meta", k); + continue; + }; - if !is_drop_time_retainable(table_meta.drop_on, *now) { - continue; - } - tb_metas.push((k, table_meta)); + if !is_drop_time_retainable(table_meta.drop_on, *now) { + continue; } + tb_metas.push((k, table_meta)); } + Ok(tb_metas) } @@ -3541,30 +3529,29 @@ async fn get_history_tables_for_gc( let mut filter_tb_infos = vec![]; - for chunk in args[..std::cmp::min(limit, args.len())].chunks(DEFAULT_MGET_SIZE) { - let table_id_idents = chunk.iter().map(|(table_id, _)| table_id.clone()); + let limited_args = &args[..std::cmp::min(limit, args.len())]; + let table_id_idents = limited_args.iter().map(|(table_id, _)| table_id.clone()); - let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?; + let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?; - for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(chunk.iter()) { - let Some(seq_meta) = seq_meta else { - error!( - "batch_filter_table_info cannot find {:?} table_meta", - table_id - ); - continue; - }; - - if !drop_time_range.contains(&seq_meta.data.drop_on) { - continue; - } + for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(limited_args.iter()) { + let Some(seq_meta) = seq_meta else { + error!( + "batch_filter_table_info cannot find {:?} table_meta", + table_id + ); + continue; + }; - filter_tb_infos.push(TableNIV::new( - DBIdTableName::new(db_id, table_name.clone()), - table_id.clone(), - seq_meta, - )); + if !drop_time_range.contains(&seq_meta.data.drop_on) { + continue; } + + filter_tb_infos.push(TableNIV::new( + DBIdTableName::new(db_id, table_name.clone()), + table_id.clone(), + seq_meta, + )); } Ok(filter_tb_infos)