Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
569 changes: 265 additions & 304 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 13 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ edition = "2024"

[dependencies]
tokio = { version = "1.48", features = ["full"] }
datafusion = "51.0.0"
datafusion-datasource = "51.0.0"
datafusion = "52.1.0"
datafusion-datasource = "52.1.0"
arrow = "57.1.0"
arrow-json = "57.1.0"
uuid = { version = "1.17", features = ["v4", "serde"] }
Expand All @@ -16,17 +16,16 @@ serde_json = "1.0.141"
serde_with = "3.14"
serde_yaml = "0.9"
async-trait = "0.1.86"
env_logger = "0.11.6"
log = "0.4.27"
color-eyre = "0.6.5"
arrow-schema = "57.1.0"
regex = "1.11.1"
# Updated to latest delta-rs with datafusion 51 and arrow 57 support
deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "cacb6c668f535bccfee182cd4ff3b6375b1a4e25", features = [
# Updated to delta-rs with datafusion 52 Utf8View fixes (includes commits 987e535f, ffb794ba)
deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "ffb794ba0745394fc4b747a4ef2e11c2d4ec086a", features = [
"datafusion",
"s3",
] }
delta_kernel = { version = "0.19.0", features = [
delta_kernel = { version = "0.19.1", features = [
"arrow-conversion",
"default-engine-rustls",
"arrow-57",
Expand All @@ -42,8 +41,8 @@ sqlx = { version = "0.8", features = [
futures = { version = "0.3.31", features = ["alloc"] }
bytes = "1.4"
tokio-rustls = "0.26.1"
datafusion-postgres = "0.13.0"
datafusion-functions-json = "0.51.0"
datafusion-postgres = "0.14.0"
datafusion-functions-json = "0.52.0"
anyhow = "1.0.100"
tokio-util = "0.7.17"
tokio-stream = { version = "0.1.17", features = ["net"] }
Expand All @@ -53,8 +52,8 @@ tracing-opentelemetry = "0.32"
opentelemetry = "0.31"
opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic"] }
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] }
datafusion-tracing = { git = "https://github.com/datafusion-contrib/datafusion-tracing.git" }
instrumented-object-store = { git = "https://github.com/datafusion-contrib/datafusion-tracing.git" }
datafusion-tracing = { git = "https://github.com/datafusion-contrib/datafusion-tracing.git", rev = "43734ac7a87eacb599d1d855a21c8c157d71acbb" }
instrumented-object-store = { git = "https://github.com/datafusion-contrib/datafusion-tracing.git", rev = "43734ac7a87eacb599d1d855a21c8c157d71acbb" }
dotenv = "0.15.0"
include_dir = "0.7"
aws-config = { version = "1.6.0", features = ["behavior-version-latest"] }
Expand All @@ -63,12 +62,13 @@ aws-sdk-s3 = "1.3.0"
aws-sdk-dynamodb = "1.3.0"
url = "2.5.4"
tokio-cron-scheduler = "0.15"
object_store = "0.12.3"
object_store = "0.12.4"
foyer = { version = "0.21.1", features = ["serde"] }
ahash = "0.8"
lru = "0.16.1"
serde_bytes = "0.11.19"
dashmap = "6.1"
parking_lot = "0.12"
envy = "0.4"
tdigests = "1.0"
bincode = { version = "2.0", features = ["serde"] }
Expand All @@ -79,11 +79,12 @@ strum = { version = "0.27", features = ["derive"] }
[dev-dependencies]
sqllogictest = { git = "https://github.com/risinglightdb/sqllogictest-rs.git" }
serial_test = "3.2.0"
datafusion-common = "51.0.0"
datafusion-common = "52.1.0"
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] }
scopeguard = "1.2.0"
rand = "0.9.2"
tempfile = "3"
test-case = "3.3"

[features]
default = []
Expand Down
113 changes: 87 additions & 26 deletions src/buffered_write_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ use tracing::{debug, error, info, instrument, warn};
// 20% overhead accounts for DashMap internal structures, RwLock wrappers,
// Arc<Schema> refs, and Arrow buffer alignment padding
const MEMORY_OVERHEAD_MULTIPLIER: f64 = 1.2;
/// Hard limit multiplier (120%) provides headroom for in-flight writes while preventing OOM
const HARD_LIMIT_MULTIPLIER: usize = 5; // max_bytes + max_bytes/5 = 120%
/// Maximum CAS retry attempts before failing
const MAX_CAS_RETRIES: u32 = 100;
/// Base backoff delay in microseconds for CAS retries
const CAS_BACKOFF_BASE_MICROS: u64 = 1;
/// Maximum backoff exponent (caps delay at ~1ms)
const CAS_BACKOFF_MAX_EXPONENT: u32 = 10;

#[derive(Debug, Default)]
pub struct RecoveryStats {
Expand All @@ -25,6 +33,13 @@ pub struct RecoveryStats {
pub corrupted_entries_skipped: u64,
}

#[derive(Debug, Default)]
pub struct FlushStats {
pub buckets_flushed: u64,
pub buckets_failed: u64,
pub total_rows: u64,
}

/// Callback for writing batches to Delta Lake. The callback MUST:
/// - Complete the Delta commit (including S3 upload) before returning Ok
/// - Return Err if the commit fails for any reason
Expand Down Expand Up @@ -93,16 +108,15 @@ impl BufferedWriteLayer {

/// Try to reserve memory atomically before a write.
/// Returns estimated batch size on success, or error if hard limit exceeded.
/// Callers MUST implement retry logic - hard failures may cause data loss.
/// Uses exponential backoff to reduce CPU thrashing under contention.
fn try_reserve_memory(&self, batches: &[RecordBatch]) -> anyhow::Result<usize> {
let batch_size: usize = batches.iter().map(estimate_batch_size).sum();
let estimated_size = (batch_size as f64 * MEMORY_OVERHEAD_MULTIPLIER) as usize;

let max_bytes = self.max_memory_bytes();
// Hard limit at 120% provides headroom for in-flight writes while preventing OOM
let hard_limit = max_bytes.saturating_add(max_bytes / 5);
let hard_limit = max_bytes.saturating_add(max_bytes / HARD_LIMIT_MULTIPLIER);

for _ in 0..100 {
for attempt in 0..MAX_CAS_RETRIES {
let current_reserved = self.reserved_bytes.load(Ordering::Acquire);
let current_mem = self.mem_buffer.estimated_memory_bytes();
let new_total = current_mem + current_reserved + estimated_size;
Expand All @@ -123,8 +137,20 @@ impl BufferedWriteLayer {
{
return Ok(estimated_size);
}

// Exponential backoff: spin_loop for first few attempts, then brief sleep.
// Note: Using std::thread::sleep in this sync function called from async context.
// This is acceptable because: (1) max sleep is ~1ms, (2) only under high contention,
// (3) converting to async would require spawn_blocking which adds more overhead.
if attempt < 5 {
std::hint::spin_loop();
} else {
// Max backoff = 1μs << 10 = 1024μs ≈ 1ms
let backoff_micros = CAS_BACKOFF_BASE_MICROS << attempt.min(CAS_BACKOFF_MAX_EXPONENT);
std::thread::sleep(std::time::Duration::from_micros(backoff_micros));
}
}
anyhow::bail!("Failed to reserve memory after 100 retries due to contention")
anyhow::bail!("Failed to reserve memory after {} retries due to contention", MAX_CAS_RETRIES)
}

fn release_reservation(&self, size: usize) {
Expand Down Expand Up @@ -169,6 +195,12 @@ impl BufferedWriteLayer {
self.release_reservation(reserved_size);

result?;

// Immediate flush mode: flush after every insert
if self.config.buffer.flush_immediately() {
self.flush_all_now().await?;
}

debug!("BufferedWriteLayer insert complete: project={}, table={}", project_id, table_name);
Ok(())
}
Expand Down Expand Up @@ -202,7 +234,7 @@ impl BufferedWriteLayer {

for entry in entries {
match entry.operation {
WalOperation::Insert => match WalManager::deserialize_batch(&entry.data) {
WalOperation::Insert => match WalManager::deserialize_batch(&entry.data, &entry.table_name) {
Ok(batch) => {
self.mem_buffer.insert(&entry.project_id, &entry.table_name, batch, entry.timestamp_micros)?;
entries_replayed += 1;
Expand Down Expand Up @@ -332,7 +364,7 @@ impl BufferedWriteLayer {
return Ok(());
}

info!("Flushing {} buckets to Delta", flushable.len());
debug!("Flushing {} buckets to Delta", flushable.len());

// Flush buckets in parallel with bounded concurrency
let parallelism = self.config.buffer.flush_parallelism();
Expand Down Expand Up @@ -442,6 +474,35 @@ impl BufferedWriteLayer {
Ok(())
}

/// Force flush all buffered data to Delta immediately.
pub async fn flush_all_now(&self) -> anyhow::Result<FlushStats> {
let _flush_guard = self.flush_lock.lock().await;
let all_buckets = self.mem_buffer.get_all_buckets();
let mut stats = FlushStats {
total_rows: all_buckets.iter().map(|b| b.row_count as u64).sum(),
..Default::default()
};

for bucket in all_buckets {
match self.flush_bucket(&bucket).await {
Ok(()) => {
self.checkpoint_and_drain(&bucket);
stats.buckets_flushed += 1;
}
Err(e) => {
error!("flush_all_now: failed bucket {}: {}", bucket.bucket_id, e);
stats.buckets_failed += 1;
}
}
}
Ok(stats)
}

/// Check if buffer is empty (all data flushed).
pub fn is_empty(&self) -> bool {
self.mem_buffer.get_stats().total_rows == 0
}

pub fn get_stats(&self) -> MemBufferStats {
self.mem_buffer.get_stats()
}
Expand Down Expand Up @@ -503,8 +564,8 @@ impl BufferedWriteLayer {
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use crate::test_utils::test_helpers::{json_to_batch, test_span};
use serial_test::serial;
use std::path::PathBuf;
use tempfile::tempdir;

Expand All @@ -514,14 +575,14 @@ mod tests {
Arc::new(cfg)
}

fn create_test_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]));
let id_array = Int64Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["a", "b", "c"]);
RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap()
fn create_test_batch(project_id: &str) -> RecordBatch {
// Use test_span helper which creates data matching the default schema
json_to_batch(vec![
test_span("test1", "span1", project_id),
test_span("test2", "span2", project_id),
test_span("test3", "span3", project_id),
])
.unwrap()
}

#[tokio::test]
Expand All @@ -535,7 +596,7 @@ mod tests {
let table = format!("t{}", test_id);

let layer = BufferedWriteLayer::with_config(cfg).unwrap();
let batch = create_test_batch();
let batch = create_test_batch(&project);

layer.insert(&project, &table, vec![batch.clone()]).await.unwrap();

Expand All @@ -544,15 +605,16 @@ mod tests {
assert_eq!(results[0].num_rows(), 3);
}

// NOTE: This test is ignored because walrus-rust creates new files for each instance
// rather than discovering existing files from previous instances in the same directory.
// This is a limitation of the walrus library, not our code.
#[ignore]
#[serial]
#[tokio::test]
async fn test_recovery() {
let dir = tempdir().unwrap();
let cfg = create_test_config(dir.path().to_path_buf());

// SAFETY: walrus-rust reads WALRUS_DATA_DIR from environment. We use #[serial]
// to prevent concurrent access to this process-global state.
unsafe { std::env::set_var("WALRUS_DATA_DIR", &cfg.core.walrus_data_dir) };

// Use unique but short project/table names (walrus has metadata size limit)
let test_id = &uuid::Uuid::new_v4().to_string()[..4];
let project = format!("r{}", test_id);
Expand All @@ -561,10 +623,9 @@ mod tests {
// First instance - write data
{
let layer = BufferedWriteLayer::with_config(Arc::clone(&cfg)).unwrap();
let batch = create_test_batch();
let batch = create_test_batch(&project);
layer.insert(&project, &table, vec![batch]).await.unwrap();
// Shutdown to ensure WAL is synced
layer.shutdown().await.unwrap();
// Layer drops here - WAL data should be persisted
}

// Second instance - recover from WAL
Expand All @@ -591,7 +652,7 @@ mod tests {
let layer = BufferedWriteLayer::with_config(cfg).unwrap();

// First insert should succeed
let batch = create_test_batch();
let batch = create_test_batch(&project);
layer.insert(&project, &table, vec![batch]).await.unwrap();

// Verify reservation is released (should be 0 after successful insert)
Expand Down
26 changes: 16 additions & 10 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,19 @@ impl AwsConfig {
}

let mut opts = HashMap::new();
insert_opt!(opts, "aws_access_key_id", self.aws_access_key_id);
insert_opt!(opts, "aws_secret_access_key", self.aws_secret_access_key);
insert_opt!(opts, "aws_region", self.aws_default_region);
opts.insert("aws_endpoint".into(), endpoint_override.unwrap_or(&self.aws_s3_endpoint).to_string());
insert_opt!(opts, "AWS_ACCESS_KEY_ID", self.aws_access_key_id);
insert_opt!(opts, "AWS_SECRET_ACCESS_KEY", self.aws_secret_access_key);
insert_opt!(opts, "AWS_REGION", self.aws_default_region);
insert_opt!(opts, "AWS_ALLOW_HTTP", self.aws_allow_http);
opts.insert("AWS_ENDPOINT_URL".into(), endpoint_override.unwrap_or(&self.aws_s3_endpoint).to_string());

if self.is_dynamodb_locking_enabled() {
opts.insert("aws_s3_locking_provider".into(), "dynamodb".into());
insert_opt!(opts, "delta_dynamo_table_name", self.dynamodb.delta_dynamo_table_name);
insert_opt!(opts, "aws_access_key_id_dynamodb", self.dynamodb.aws_access_key_id_dynamodb);
insert_opt!(opts, "aws_secret_access_key_dynamodb", self.dynamodb.aws_secret_access_key_dynamodb);
insert_opt!(opts, "aws_region_dynamodb", self.dynamodb.aws_region_dynamodb);
insert_opt!(opts, "aws_endpoint_url_dynamodb", self.dynamodb.aws_endpoint_url_dynamodb);
opts.insert("AWS_S3_LOCKING_PROVIDER".into(), "dynamodb".into());
insert_opt!(opts, "DELTA_DYNAMO_TABLE_NAME", self.dynamodb.delta_dynamo_table_name);
insert_opt!(opts, "AWS_ACCESS_KEY_ID_DYNAMODB", self.dynamodb.aws_access_key_id_dynamodb);
insert_opt!(opts, "AWS_SECRET_ACCESS_KEY_DYNAMODB", self.dynamodb.aws_secret_access_key_dynamodb);
insert_opt!(opts, "AWS_REGION_DYNAMODB", self.dynamodb.aws_region_dynamodb);
insert_opt!(opts, "AWS_ENDPOINT_URL_DYNAMODB", self.dynamodb.aws_endpoint_url_dynamodb);
}
opts
}
Expand Down Expand Up @@ -247,6 +248,8 @@ pub struct BufferConfig {
pub timefusion_wal_corruption_threshold: usize,
#[serde(default = "d_flush_parallelism")]
pub timefusion_flush_parallelism: usize,
#[serde(default)]
pub timefusion_flush_immediately: bool,
}

impl BufferConfig {
Expand All @@ -268,6 +271,9 @@ impl BufferConfig {
pub fn flush_parallelism(&self) -> usize {
self.timefusion_flush_parallelism.max(1)
}
pub fn flush_immediately(&self) -> bool {
self.timefusion_flush_immediately
}

pub fn compute_shutdown_timeout(&self, current_memory_mb: usize) -> Duration {
Duration::from_secs((self.timefusion_shutdown_timeout_secs.max(1) + (current_memory_mb / 100) as u64).min(300))
Expand Down
Loading
Loading