diff --git a/Cargo.lock b/Cargo.lock index 39e5d478b61..f69d622d6ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3446,6 +3446,7 @@ dependencies = [ "data_types", "datafusion", "datafusion_util", + "executor", "futures", "futures-util", "hashbrown 0.15.1", diff --git a/Cargo.toml b/Cargo.toml index e91aa1aab23..ead4abddbb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,6 +127,7 @@ authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba969 clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" } data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" } datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" } +executor = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" } influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160", features = ["v3"] } influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" } influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" } diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index fb330af045e..e7e00aa75fd 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true # Core Crates data_types.workspace = true datafusion_util.workspace = true +executor.workspace = true influxdb-line-protocol.workspace = true iox_catalog.workspace = true iox_http.workspace = true diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index a70f884c8a5..d2211f6ad2e 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -691,3 +691,159 @@ async fn sort_dedupe_persist( } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::write_buffer::validator::WriteValidator; + use crate::Precision; + use datafusion_util::config::register_iox_object_store; + use executor::{register_current_runtime_for_io, DedicatedExecutor}; + use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber}; + use iox_query::exec::ExecutorConfig; + use iox_time::{MockProvider, Time, TimeProvider}; + use object_store::memory::InMemory; + use object_store::ObjectStore; + use parquet_file::storage::{ParquetStorage, StorageId}; + use std::num::NonZeroUsize; + + #[tokio::test] + async fn snapshot_works_with_not_all_columns_in_buffer() { + let object_store: Arc = Arc::new(InMemory::new()); + let metrics = Arc::new(metric::Registry::default()); + + let parquet_store = + ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3")); + let exec = Arc::new(Executor::new_with_config_and_executor( + ExecutorConfig { + target_query_partitions: NonZeroUsize::new(1).unwrap(), + object_stores: [&parquet_store] + .into_iter() + .map(|store| (store.id(), Arc::clone(store.object_store()))) + .collect(), + metric_registry: Arc::clone(&metrics), + // Default to 1gb + mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb) + }, + DedicatedExecutor::new_testing(), + )); + let runtime_env = exec.new_context().inner().runtime_env(); + register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store)); + register_current_runtime_for_io(); + + let catalog = Arc::new(Catalog::new("hosta".into(), "foo".into())); + let persister = Arc::new(Persister::new(Arc::clone(&object_store), "hosta")); + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + + let queryable_buffer_args = QueryableBufferArgs { + executor: Arc::clone(&exec), + catalog: Arc::clone(&catalog), + persister: Arc::clone(&persister), + last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + meta_cache_provider: MetaCacheProvider::new_from_catalog( + Arc::clone(&time_provider), + Arc::clone(&catalog), + ) + .unwrap(), + persisted_files: Arc::new(Default::default()), + parquet_cache: None, + }; + let queryable_buffer = QueryableBuffer::new(queryable_buffer_args); + + let db = data_types::NamespaceName::new("testdb").unwrap(); + + // create the initial write with two tags + let val = WriteValidator::initialize(db.clone(), Arc::clone(&catalog), 0).unwrap(); + let lp = "foo,t1=a,t2=b f1=1i 1000000000"; + + let lines = val + .v1_parse_lines_and_update_schema(lp, false, time_provider.now(), Precision::Nanosecond) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_1m()); + let batch: WriteBatch = lines.into(); + let wal_contents = WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: batch.min_time_ns, + max_timestamp_ns: batch.max_time_ns, + wal_file_number: WalFileSequenceNumber::new(1), + ops: vec![WalOp::Write(batch)], + snapshot: None, + }; + let end_time = + wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; + + // write the lp into the buffer + queryable_buffer.notify(wal_contents); + + // now force a snapshot, persisting the data to parquet file. Also, buffer up a new write + let snapshot_sequence_number = SnapshotSequenceNumber::new(1); + let snapshot_details = SnapshotDetails { + snapshot_sequence_number: snapshot_sequence_number, + end_time_marker: end_time, + last_wal_sequence_number: WalFileSequenceNumber::new(2), + }; + + // create another write, this time with only one tag, in a different gen1 block + let lp = "foo,t2=b f1=1i 240000000000"; + let val = WriteValidator::initialize(db, Arc::clone(&catalog), 0).unwrap(); + + let lines = val + .v1_parse_lines_and_update_schema(lp, false, time_provider.now(), Precision::Nanosecond) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_1m()); + let batch: WriteBatch = lines.into(); + let wal_contents = WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: batch.min_time_ns, + max_timestamp_ns: batch.max_time_ns, + wal_file_number: WalFileSequenceNumber::new(2), + ops: vec![WalOp::Write(batch)], + snapshot: None, + }; + let end_time = + wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; + + let details = queryable_buffer + .notify_and_snapshot(wal_contents, snapshot_details) + .await; + let _details = details.await.unwrap(); + + // validate we have a single persisted file + let db = catalog.db_schema("testdb").unwrap(); + let table = db.table_definition("foo").unwrap(); + let files = queryable_buffer + .persisted_files + .get_files(db.id, table.table_id); + assert_eq!(files.len(), 1); + + // now force another snapshot, persisting the data to parquet file + let snapshot_sequence_number = SnapshotSequenceNumber::new(2); + let snapshot_details = SnapshotDetails { + snapshot_sequence_number: snapshot_sequence_number, + end_time_marker: end_time, + last_wal_sequence_number: WalFileSequenceNumber::new(3), + }; + queryable_buffer + .notify_and_snapshot( + WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: 0, + max_timestamp_ns: 0, + wal_file_number: WalFileSequenceNumber::new(3), + ops: vec![], + snapshot: Some(snapshot_details.clone()), + }, + snapshot_details, + ) + .await + .await + .unwrap(); + + // validate we have two persisted files + let files = queryable_buffer + .persisted_files + .get_files(db.id, table.table_id); + assert_eq!(files.len(), 2); + } +}