Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,5 @@ large_futures = "warn"
used_underscore_binding = "warn"

[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)", "cfg(tarpaulin_include)"] }
unused_qualifications = "deny"
140 changes: 104 additions & 36 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,8 @@ pub(crate) mod test_util {
mod tests {

use std::fmt::{self, Display, Formatter};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::datasource::file_format::parquet::test_util::store_parquet;
Expand All @@ -117,7 +115,7 @@ mod tests {
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};

use arrow::array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use arrow_schema::Schema;
use datafusion_catalog::Session;
use datafusion_common::cast::{
as_binary_array, as_binary_view_array, as_boolean_array, as_float32_array,
Expand All @@ -137,7 +135,7 @@ mod tests {
};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::{RecordBatchStream, TaskContext};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::{collect, ExecutionPlan};
Expand All @@ -150,7 +148,7 @@ mod tests {
use async_trait::async_trait;
use datafusion_datasource::file_groups::FileGroup;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use futures::StreamExt;
use insta::assert_snapshot;
use log::error;
use object_store::local::LocalFileSystem;
Expand All @@ -166,6 +164,8 @@ mod tests {
use parquet::format::FileMetaData;
use tokio::fs::File;

use crate::test_util::bounded_stream;

enum ForceViews {
Yes,
No,
Expand Down Expand Up @@ -1646,42 +1646,110 @@ mod tests {
Ok(())
}

/// Creates an bounded stream for testing purposes.
fn bounded_stream(
batch: RecordBatch,
limit: usize,
) -> datafusion_execution::SendableRecordBatchStream {
Box::pin(BoundedStream {
count: 0,
limit,
batch,
})
}
#[tokio::test]
async fn test_memory_reservation_column_parallel() -> Result<()> {
async fn test_memory_reservation(global: ParquetOptions) -> Result<()> {
let field_a = Field::new("a", DataType::Utf8, false);
let field_b = Field::new("b", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let object_store_url = ObjectStoreUrl::local_filesystem();

struct BoundedStream {
limit: usize,
count: usize,
batch: RecordBatch,
}
let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: object_store_url.clone(),
file_group: FileGroup::new(vec![PartitionedFile::new(
"/tmp".to_string(),
1,
)]),
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions {
key_value_metadata: std::collections::HashMap::from([
("my-data".to_string(), Some("stuff".to_string())),
("my-data-bool-key".to_string(), None),
]),
global,
..Default::default()
},
));

// create data
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
let batch =
RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();

impl Stream for BoundedStream {
type Item = Result<RecordBatch>;
// create task context
let task_context = build_ctx(object_store_url.as_ref());
assert_eq!(
task_context.memory_pool().reserved(),
0,
"no bytes are reserved yet"
);

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.count >= self.limit {
return Poll::Ready(None);
let mut write_task = FileSink::write_all(
parquet_sink.as_ref(),
Box::pin(RecordBatchStreamAdapter::new(
schema,
bounded_stream(batch, 1000),
)),
&task_context,
);

// incrementally poll and check for memory reservation
let mut reserved_bytes = 0;
while futures::poll!(&mut write_task).is_pending() {
reserved_bytes += task_context.memory_pool().reserved();
tokio::time::sleep(Duration::from_micros(1)).await;
}
self.count += 1;
Poll::Ready(Some(Ok(self.batch.clone())))
}
}
assert!(
reserved_bytes > 0,
"should have bytes reserved during write"
);
assert_eq!(
task_context.memory_pool().reserved(),
0,
"no leaking byte reservation"
);

impl RecordBatchStream for BoundedStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
Ok(())
}

let write_opts = ParquetOptions {
allow_single_file_parallelism: false,
..Default::default()
};
test_memory_reservation(write_opts)
.await
.expect("should track for non-parallel writes");

let row_parallel_write_opts = ParquetOptions {
allow_single_file_parallelism: true,
maximum_parallel_row_group_writers: 10,
maximum_buffered_record_batches_per_stream: 1,
..Default::default()
};
test_memory_reservation(row_parallel_write_opts)
.await
.expect("should track for row-parallel writes");

let col_parallel_write_opts = ParquetOptions {
allow_single_file_parallelism: true,
maximum_parallel_row_group_writers: 1,
maximum_buffered_record_batches_per_stream: 2,
..Default::default()
};
test_memory_reservation(col_parallel_write_opts)
.await
.expect("should track for column-parallel writes");

Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ pub mod test;

mod schema_equivalence;
pub mod test_util;
// pub use test_util::bounded_stream;

#[cfg(doctest)]
doc_comment::doctest!("../../../README.md", readme_example_test);
Expand Down
47 changes: 47 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ pub mod parquet;

pub mod csv;

use futures::Stream;
use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::catalog::{TableProvider, TableProviderFactory};
use crate::dataframe::DataFrame;
Expand All @@ -38,11 +40,13 @@ use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::ExecutionPlan;
use crate::prelude::{CsvReadOptions, SessionContext};

use crate::execution::SendableRecordBatchStream;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_catalog::Session;
use datafusion_common::TableReference;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use std::pin::Pin;

use async_trait::async_trait;

Expand All @@ -52,6 +56,8 @@ use tempfile::TempDir;
pub use datafusion_common::test_util::parquet_test_data;
pub use datafusion_common::test_util::{arrow_test_data, get_data_dir};

use crate::execution::RecordBatchStream;

/// Scan an empty data source, mainly used in tests
pub fn scan_empty(
name: Option<&str>,
Expand Down Expand Up @@ -234,3 +240,44 @@ pub fn register_unbounded_file_with_ordering(
ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?;
Ok(())
}

/// Creates a bounded stream that emits the same record batch a specified number of times.
/// This is useful for testing purposes.
pub fn bounded_stream(
record_batch: RecordBatch,
limit: usize,
) -> SendableRecordBatchStream {
Box::pin(BoundedStream {
record_batch,
count: 0,
limit,
})
}

struct BoundedStream {
record_batch: RecordBatch,
count: usize,
limit: usize,
}

impl Stream for BoundedStream {
type Item = Result<RecordBatch, crate::error::DataFusionError>;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.count >= self.limit {
Poll::Ready(None)
} else {
self.count += 1;
Poll::Ready(Some(Ok(self.record_batch.clone())))
}
}
}

impl RecordBatchStream for BoundedStream {
fn schema(&self) -> SchemaRef {
self.record_batch.schema()
}
}
2 changes: 1 addition & 1 deletion datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow::array::{ArrayRef, BooleanArray};
use datafusion_common::{not_impl_err, Result};

/// Describes how many rows should be emitted during grouping.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EmitTo {
/// Emit all groups
All,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ arrow = { workspace = true, features = ["ffi"] }
async-ffi = { version = "0.5.0", features = ["abi_stable"] }
async-trait = { workspace = true }
datafusion = { workspace = true, default-features = false }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-proto-common = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
prost = { workspace = true }
Expand All @@ -55,3 +57,4 @@ doc-comment = { workspace = true }

[features]
integration-tests = []
tarpaulin_include = [] # Exists only to prevent warnings on stable and still have accurate coverage
Loading