Skip to content

Commit

Permalink
feat: compress with zstd for loading data (#376)
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Apr 3, 2024
1 parent 4ee348d commit a415abb
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 6 deletions.
4 changes: 0 additions & 4 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,20 @@ impl InputFormat {
options.insert("field_delimiter", ",");
options.insert("quote", "\"");
options.insert("skip_header", "0");
options.insert("compression", "NONE");
}
InputFormat::TSV => {
options.insert("type", "TSV");
options.insert("record_delimiter", "\n");
options.insert("field_delimiter", "\t");
options.insert("compression", "NONE");
}
InputFormat::NDJSON => {
options.insert("type", "NDJSON");
options.insert("compression", "NONE");
}
InputFormat::Parquet => {
options.insert("type", "Parquet");
}
InputFormat::XML => {
options.insert("type", "XML");
options.insert("compression", "NONE");
options.insert("row_tag", "row");
}
}
Expand Down
1 change: 1 addition & 0 deletions driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ databend-client = { workspace = true }
databend-driver-macros = { workspace = true }
databend-sql = { workspace = true }

async-compression = { version = "0.4", features = ["tokio", "zstd"] }
async-trait = "0.1"
chrono = { version = "0.4.35", default-features = false, features = ["clock"] }
csv = "1.3"
Expand Down
30 changes: 28 additions & 2 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

use std::collections::{BTreeMap, VecDeque};
use std::future::Future;
use std::io::Cursor;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use async_compression::tokio::write::ZstdEncoder;
use async_trait::async_trait;
use log::info;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_stream::Stream;

use databend_client::error::Error as ClientError;
Expand Down Expand Up @@ -113,10 +116,33 @@ impl Connection for RestAPIConnection {
.timestamp_nanos_opt()
.ok_or_else(|| Error::IO("Failed to get current timestamp".to_string()))?;
let stage = format!("@~/client/load/{}", now);
self.upload_to_stage(&stage, data, size).await?;
let file_format_options =

let mut file_format_options =
file_format_options.unwrap_or_else(Self::default_file_format_options);
let copy_options = copy_options.unwrap_or_else(Self::default_copy_options);

let mut data = data;
let mut size = size;

if file_format_options.get("compression").is_none() {
let mut buffer = Vec::new();
let real_size = data.read_to_end(&mut buffer).await?;
if real_size != size as usize && size != 0 {
return Err(Error::IO(format!(
"Failed to read all data, expected: {}, read: {}",
size, real_size
)));
}
let mut encoder = ZstdEncoder::new(Vec::new());
encoder.write_all(&buffer).await?;
encoder.shutdown().await?;
file_format_options.insert("compression", "ZSTD");
let output = encoder.into_inner();
size = output.len() as u64;
data = Box::new(Cursor::new(output))
}

self.upload_to_stage(&stage, data, size).await?;
let resp = self
.client
.insert_with_stage(sql, &stage, file_format_options, copy_options)
Expand Down

0 comments on commit a415abb

Please sign in to comment.