From a415abb9aa89a0bfa6317ac40f4f5e92767c482a Mon Sep 17 00:00:00 2001 From: everpcpc Date: Wed, 3 Apr 2024 13:22:04 +0800 Subject: [PATCH] feat: compress with zstd for loading data (#376) --- cli/src/main.rs | 4 ---- driver/Cargo.toml | 1 + driver/src/rest_api.rs | 30 ++++++++++++++++++++++++++++-- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index 0b5415ce..c1892aae 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -66,24 +66,20 @@ impl InputFormat { options.insert("field_delimiter", ","); options.insert("quote", "\""); options.insert("skip_header", "0"); - options.insert("compression", "NONE"); } InputFormat::TSV => { options.insert("type", "TSV"); options.insert("record_delimiter", "\n"); options.insert("field_delimiter", "\t"); - options.insert("compression", "NONE"); } InputFormat::NDJSON => { options.insert("type", "NDJSON"); - options.insert("compression", "NONE"); } InputFormat::Parquet => { options.insert("type", "Parquet"); } InputFormat::XML => { options.insert("type", "XML"); - options.insert("compression", "NONE"); options.insert("row_tag", "row"); } } diff --git a/driver/Cargo.toml b/driver/Cargo.toml index 8b029b38..df02a234 100644 --- a/driver/Cargo.toml +++ b/driver/Cargo.toml @@ -30,6 +30,7 @@ databend-client = { workspace = true } databend-driver-macros = { workspace = true } databend-sql = { workspace = true } +async-compression = { version = "0.4", features = ["tokio", "zstd"] } async-trait = "0.1" chrono = { version = "0.4.35", default-features = false, features = ["clock"] } csv = "1.3" diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index d4c55427..eafbd583 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -14,14 +14,17 @@ use std::collections::{BTreeMap, VecDeque}; use std::future::Future; +use std::io::Cursor; use std::path::Path; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use async_compression::tokio::write::ZstdEncoder; use async_trait::async_trait; use log::info; use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_stream::Stream; use databend_client::error::Error as ClientError; @@ -113,10 +116,33 @@ impl Connection for RestAPIConnection { .timestamp_nanos_opt() .ok_or_else(|| Error::IO("Failed to get current timestamp".to_string()))?; let stage = format!("@~/client/load/{}", now); - self.upload_to_stage(&stage, data, size).await?; - let file_format_options = + + let mut file_format_options = file_format_options.unwrap_or_else(Self::default_file_format_options); let copy_options = copy_options.unwrap_or_else(Self::default_copy_options); + + let mut data = data; + let mut size = size; + + if file_format_options.get("compression").is_none() { + let mut buffer = Vec::new(); + let real_size = data.read_to_end(&mut buffer).await?; + if real_size != size as usize && size != 0 { + return Err(Error::IO(format!( + "Failed to read all data, expected: {}, read: {}", + size, real_size + ))); + } + let mut encoder = ZstdEncoder::new(Vec::new()); + encoder.write_all(&buffer).await?; + encoder.shutdown().await?; + file_format_options.insert("compression", "ZSTD"); + let output = encoder.into_inner(); + size = output.len() as u64; + data = Box::new(Cursor::new(output)) + } + + self.upload_to_stage(&stage, data, size).await?; let resp = self .client .insert_with_stage(sql, &stage, file_format_options, copy_options)