Skip to content

Commit

Permalink
Upgrade duckdb to 0.10
Browse files Browse the repository at this point in the history
  • Loading branch information
brilee committed Mar 11, 2024
1 parent 1bef03e commit e0ec799
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 67 deletions.
14 changes: 10 additions & 4 deletions lilac/data/dataset_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1787,7 +1787,7 @@ def stats(self, leaf_path: Path, include_deleted: bool = False) -> StatsResult:
avg_text_length: Optional[int] = None
if leaf.dtype in (STRING, STRING_SPAN):
avg_length_query = f"""
SELECT avg(length(val))
SELECT avg(length(CAST(val AS VARCHAR)))
FROM (SELECT {inner_select} AS val FROM t {where_clause})
USING SAMPLE {SAMPLE_AVG_TEXT_LENGTH};
"""
Expand Down Expand Up @@ -1835,10 +1835,16 @@ def stats(self, leaf_path: Path, include_deleted: bool = False) -> StatsResult:
"""
row = self._query(min_max_query)[0]
result.min_val, result.max_val = row
if is_temporal(leaf.dtype):
sample_where_clause = ''
else:
sample_where_clause = (
f'WHERE val != 0 {'AND NOT isnan(val)' if is_float(leaf.dtype) else ''}'
)
sample_query = f"""
SELECT COALESCE(ARRAY_AGG(val), [])
FROM (SELECT {inner_select} as val FROM t {where_clause})
WHERE val != 0 {'AND NOT isnan(val)' if is_float(leaf.dtype) else ''}
{sample_where_clause}
USING SAMPLE 100;
"""
result.value_samples = list(self._query(sample_query)[0][0])
Expand Down Expand Up @@ -1894,9 +1900,9 @@ def select_groups(
sql_bounds = []
for label, start, end in named_bins:
if start is None:
start = cast(float, "'-Infinity'")
start = cast(float, "'-Infinity'::FLOAT")
if end is None:
end = cast(float, "'Infinity'")
end = cast(float, "'Infinity'::FLOAT")
sql_bounds.append(f"('{label}', {start}, {end})")

bin_index_col = 'col0'
Expand Down
4 changes: 2 additions & 2 deletions lilac/data/dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ def test_select_star(make_test_data: TestDataMaker) -> None:
# Select * plus an inner `info.age` column.
result = dataset.select_rows(['*', ('info', 'age')])
assert list(result) == [
{'info.age': 40, 'info.age_2': 40, 'name': 'A'},
{'info.age': 42, 'info.age_2': 42, 'name': 'B'},
{'info.age': 40, 'info.age_1': 40, 'name': 'A'},
{'info.age': 42, 'info.age_1': 42, 'name': 'B'},
]


Expand Down
23 changes: 13 additions & 10 deletions lilac/sources/csv_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,31 @@ def setup(self) -> None:

# NOTE: We use duckdb here to increase parallelism for multiple files.
# NOTE: We turn off the parallel reader because of https://github.com/lilacai/lilac/issues/373.
self._con.execute(
f"""
CREATE SEQUENCE serial START 1;
CREATE VIEW t as (SELECT nextval('serial') as "{LINE_NUMBER_COLUMN}", * FROM read_csv_auto(
csv_source_sql = f"""read_csv_auto(
{duckdb_paths},
SAMPLE_SIZE=500000,
HEADER={self.header},
{f'NAMES={self.names},' if self.names else ''}
DELIM='{self.delim or ','}',
IGNORE_ERRORS=true,
PARALLEL=false
));
"""
)

res = self._con.execute('SELECT COUNT(*) FROM t').fetchone()
)"""
res = self._con.execute(f'SELECT COUNT(*) FROM {csv_source_sql}').fetchone()
num_items = cast(tuple[int], res)[0]

self._reader = self._con.execute('SELECT * from t').fetch_record_batch(rows_per_batch=10_000)
self._reader = self._con.execute(
f'SELECT 1::BIGINT as "{LINE_NUMBER_COLUMN}", * from {csv_source_sql}'
).fetch_record_batch(rows_per_batch=10_000)
# Create the source schema in prepare to share it between process and source_schema.
schema = arrow_schema_to_schema(self._reader.schema)
self._source_schema = SourceSchema(fields=schema.fields, num_items=num_items)
self._con.execute(
f"""
CREATE SEQUENCE serial START 1;
CREATE VIEW t as (
SELECT nextval('serial') as "{LINE_NUMBER_COLUMN}", * FROM {csv_source_sql});
"""
)

@override
def source_schema(self) -> SourceSchema:
Expand Down
Loading

0 comments on commit e0ec799

Please sign in to comment.