Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,10 @@ jobs:
aws --endpoint-url http://127.0.0.1:9000 s3 mb s3://timefusion-test || true
aws --endpoint-url http://127.0.0.1:9000 s3 mb s3://timefusion-tests || true

- name: Run all tests
run: cargo test --all-features -- --include-ignored
- name: Run tests
run: cargo test --all-features

- name: Run ignored tests (optional)
continue-on-error: true
timeout-minutes: 10
run: cargo test --all-features -- --ignored
2 changes: 0 additions & 2 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ name: Build and Deploy
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build:
Expand Down
343 changes: 298 additions & 45 deletions src/database.rs

Large diffs are not rendered by default.

15 changes: 12 additions & 3 deletions src/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ pub async fn perform_delta_delete(database: &Database, table_name: &str, project
/// Common Delta operation logic
async fn perform_delta_operation<F, Fut>(database: &Database, table_name: &str, project_id: &str, operation: F) -> Result<u64>
where
F: FnOnce(deltalake::DeltaTable) -> Fut,
Fut: std::future::Future<Output = Result<(deltalake::DeltaTable, u64)>>,
F: FnOnce(deltalake::DeltaTable) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<(deltalake::DeltaTable, u64)>> + Send,
{
let table_key = (project_id.to_string(), table_name.to_string());
let table_lock = database
Expand All @@ -449,7 +449,16 @@ where
.clone();

let delta_table = table_lock.write().await;
let (new_table, rows_affected) = operation(delta_table.clone()).await?;
let delta_table_clone = delta_table.clone();

// Use block_in_place to avoid conflicts with delta-rs's internal executor
// This is the same pattern used in insert_records_batch
let result = tokio::task::block_in_place(|| {
let handle = tokio::runtime::Handle::current();
handle.block_on(async move { operation(delta_table_clone).await })
});

let (new_table, rows_affected) = result?;

drop(delta_table);
*table_lock.write().await = new_table;
Expand Down
152 changes: 123 additions & 29 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,42 @@ pub fn register_custom_functions(ctx: &mut datafusion::execution::context::Sessi

/// Create the to_char UDF for PostgreSQL-compatible timestamp formatting
fn create_to_char_udf() -> ScalarUDF {
let to_char_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| -> datafusion::error::Result<ColumnarValue> {
ScalarUDF::from(ToCharUDF::new())
}

#[derive(Debug, Hash, Eq, PartialEq)]
struct ToCharUDF {
signature: Signature,
}

impl ToCharUDF {
fn new() -> Self {
Self {
// Accept any 2 arguments - we'll validate at runtime
signature: Signature::any(2, Volatility::Immutable),
}
}
}

impl ScalarUDFImpl for ToCharUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"to_char"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
Ok(DataType::Utf8)
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> datafusion::error::Result<ColumnarValue> {
let args = args.args;
if args.len() != 2 {
return Err(DataFusionError::Execution(
"to_char requires exactly 2 arguments: timestamp and format string".to_string(),
Expand All @@ -69,24 +104,25 @@ fn create_to_char_udf() -> ScalarUDF {
datafusion::scalar::ScalarValue::Utf8(Some(s)) => s.clone(),
_ => return Err(DataFusionError::Execution("Format string must be a UTF8 string".to_string())),
},
ColumnarValue::Array(_) => {
return Err(DataFusionError::Execution("Format string must be a scalar value".to_string()));
ColumnarValue::Array(arr) => {
// Try to get first element if it's a string array
if let Some(string_arr) = arr.as_any().downcast_ref::<StringArray>() {
if string_arr.len() > 0 && !string_arr.is_null(0) {
string_arr.value(0).to_string()
} else {
return Err(DataFusionError::Execution("Format string must be a non-null string".to_string()));
}
} else {
return Err(DataFusionError::Execution("Format string must be a UTF8 string".to_string()));
}
}
};

// Convert timestamps to formatted strings
let result = format_timestamps(&timestamp_array, &format_str)?;

Ok(ColumnarValue::Array(result))
});

create_udf(
"to_char",
vec![DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))), DataType::Utf8],
DataType::Utf8,
Volatility::Immutable,
to_char_fn,
)
}
}

/// Format timestamps according to PostgreSQL format patterns
Expand Down Expand Up @@ -150,7 +186,47 @@ fn postgres_to_chrono_format(pg_format: &str) -> String {

/// Create the AT TIME ZONE UDF for timezone conversion
fn create_at_time_zone_udf() -> ScalarUDF {
let at_time_zone_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| -> datafusion::error::Result<ColumnarValue> {
ScalarUDF::from(AtTimeZoneUDF::new())
}

#[derive(Debug, Hash, Eq, PartialEq)]
struct AtTimeZoneUDF {
signature: Signature,
}

impl AtTimeZoneUDF {
fn new() -> Self {
Self {
// Accept any 2 arguments - we'll validate at runtime
signature: Signature::any(2, Volatility::Immutable),
}
}
}

impl ScalarUDFImpl for AtTimeZoneUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"at_time_zone"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
// Return timestamp with the same unit but no timezone annotation
// The actual timezone is added at runtime and we can't know it at planning time
match &arg_types[0] {
DataType::Timestamp(unit, _) => Ok(DataType::Timestamp(*unit, None)),
_ => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
}
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> datafusion::error::Result<ColumnarValue> {
let args = args.args;
if args.len() != 2 {
return Err(DataFusionError::Execution(
"AT TIME ZONE requires exactly 2 arguments: timestamp and timezone".to_string(),
Expand All @@ -169,24 +245,25 @@ fn create_at_time_zone_udf() -> ScalarUDF {
datafusion::scalar::ScalarValue::Utf8(Some(s)) => s.clone(),
_ => return Err(DataFusionError::Execution("Timezone must be a UTF8 string".to_string())),
},
ColumnarValue::Array(_) => {
return Err(DataFusionError::Execution("Timezone must be a scalar value".to_string()));
ColumnarValue::Array(arr) => {
// Try to get first element if it's a string array
if let Some(string_arr) = arr.as_any().downcast_ref::<StringArray>() {
if string_arr.len() > 0 && !string_arr.is_null(0) {
string_arr.value(0).to_string()
} else {
return Err(DataFusionError::Execution("Timezone must be a non-null string".to_string()));
}
} else {
return Err(DataFusionError::Execution("Timezone must be a UTF8 string".to_string()));
}
}
};

// Convert timestamps to the specified timezone
let result = convert_timezone(&timestamp_array, &tz_str)?;

Ok(ColumnarValue::Array(result))
});

create_udf(
"at_time_zone",
vec![DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))), DataType::Utf8],
DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
Volatility::Immutable,
at_time_zone_fn,
)
}
}

/// Convert timestamps to a different timezone
Expand All @@ -196,11 +273,13 @@ fn convert_timezone(timestamp_array: &ArrayRef, tz_str: &str) -> datafusion::err

// Handle microsecond timestamps (which is what we're using)
if let Some(timestamps) = timestamp_array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
let mut builder = TimestampMicrosecondArray::builder(timestamps.len());
let mut values = Vec::with_capacity(timestamps.len());
let mut nulls = Vec::with_capacity(timestamps.len());

for i in 0..timestamps.len() {
if timestamps.is_null(i) {
builder.append_null();
values.push(0);
nulls.push(false);
} else {
let timestamp_us = timestamps.value(i);
let datetime =
Expand All @@ -210,11 +289,14 @@ fn convert_timezone(timestamp_array: &ArrayRef, tz_str: &str) -> datafusion::err
let converted = datetime.with_timezone(&tz);

// Convert back to UTC timestamp for storage
builder.append_value(converted.timestamp_micros());
values.push(converted.timestamp_micros());
nulls.push(true);
}
}

Ok(Arc::new(builder.finish()))
// Create array without timezone annotation (matches return_type which returns None)
let array = TimestampMicrosecondArray::from(values);
Ok(Arc::new(array))
} else if let Some(timestamps) = timestamp_array.as_any().downcast_ref::<TimestampNanosecondArray>() {
let mut builder = TimestampNanosecondArray::builder(timestamps.len());

Expand Down Expand Up @@ -490,7 +572,18 @@ fn array_to_json_values(array: &ArrayRef) -> datafusion::error::Result<Vec<JsonV
if string_array.is_null(i) {
values.push(JsonValue::Null);
} else {
values.push(JsonValue::String(string_array.value(i).to_string()));
let s = string_array.value(i);
// Try to parse as JSON if it looks like JSON (starts with { or [)
let trimmed = s.trim();
if (trimmed.starts_with('{') && trimmed.ends_with('}')) || (trimmed.starts_with('[') && trimmed.ends_with(']')) {
if let Ok(parsed) = serde_json::from_str::<JsonValue>(trimmed) {
values.push(parsed);
} else {
values.push(JsonValue::String(s.to_string()));
}
} else {
values.push(JsonValue::String(s.to_string()));
}
}
}
}
Expand Down Expand Up @@ -986,4 +1079,5 @@ mod tests {
assert!(parse_interval_to_micros("abc minutes").is_err());
assert!(parse_interval_to_micros("m5").is_err()); // unit before number
}

}
28 changes: 27 additions & 1 deletion src/object_store_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::field::Empty;
use tracing::{Instrument, debug, info, instrument};

use foyer::{BlockEngineBuilder, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder, HybridCachePolicy, IoEngineBuilder, PsyncIoEngineBuilder};
use foyer::{BlockEngineBuilder, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder, HybridCachePolicy, IoEngineBuilder, PsyncIoEngineBuilder, RuntimeOptions, TokioRuntimeOptions};
use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinSet;
Expand Down Expand Up @@ -242,12 +242,26 @@ impl SharedFoyerCache {
let metadata_cache_dir = config.cache_dir.join("metadata");
std::fs::create_dir_all(&metadata_cache_dir)?;

// Configure a dedicated runtime for Foyer disk IO to avoid blocking the main tokio runtime
// This prevents blocking sync disk IO from starving the tokio runtime's network polling
let runtime_opts = RuntimeOptions::Separated {
read_runtime_options: TokioRuntimeOptions {
worker_threads: 2,
..Default::default()
},
write_runtime_options: TokioRuntimeOptions {
worker_threads: 2,
..Default::default()
},
};

let cache = HybridCacheBuilder::new()
.with_policy(HybridCachePolicy::WriteOnInsertion)
.memory(config.memory_size_bytes)
.with_shards(config.shards)
.with_weighter(|_key: &String, value: &CacheValue| value.data.len())
.storage()
.with_runtime_options(runtime_opts.clone())
.with_io_engine(PsyncIoEngineBuilder::new().build().await?)
.with_engine_config(
BlockEngineBuilder::new(FsDeviceBuilder::new(&config.cache_dir).with_capacity(config.disk_size_bytes).build()?)
Expand All @@ -256,12 +270,24 @@ impl SharedFoyerCache {
.build()
.await?;

let metadata_runtime_opts = RuntimeOptions::Separated {
read_runtime_options: TokioRuntimeOptions {
worker_threads: 2,
..Default::default()
},
write_runtime_options: TokioRuntimeOptions {
worker_threads: 2,
..Default::default()
},
};

let metadata_cache = HybridCacheBuilder::new()
.with_policy(HybridCachePolicy::WriteOnInsertion)
.memory(config.metadata_memory_size_bytes)
.with_shards(config.metadata_shards)
.with_weighter(|_key: &String, value: &CacheValue| value.data.len())
.storage()
.with_runtime_options(metadata_runtime_opts)
.with_io_engine(PsyncIoEngineBuilder::new().build().await?)
.with_engine_config(
BlockEngineBuilder::new(FsDeviceBuilder::new(&metadata_cache_dir).with_capacity(config.metadata_disk_size_bytes).build()?)
Expand Down
1 change: 0 additions & 1 deletion src/pgwire_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ impl ExtendedQueryHandler for LoggingExtendedQueryHandler {

// Get query text and determine type
let query = &portal.statement.statement.0;

let query_lower = query.trim().to_lowercase();
let (query_type, operation) = if query_lower.starts_with("select") || query_lower.contains(" select ") {
("SELECT", "SELECT")
Expand Down
Loading
Loading