From e958883bf77ec5fc34dcfff5335e37d2d08f33bd Mon Sep 17 00:00:00 2001 From: montanarograziano Date: Sat, 31 Aug 2024 15:00:34 +0200 Subject: [PATCH] feat: add q4 implementation --- tpch/execute/q4.py | 36 ++++++++++++++++++++++++++++++++++++ tpch/queries/q4.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 tpch/execute/q4.py create mode 100644 tpch/queries/q4.py diff --git a/tpch/execute/q4.py b/tpch/execute/q4.py new file mode 100644 index 000000000..672a43e17 --- /dev/null +++ b/tpch/execute/q4.py @@ -0,0 +1,36 @@ +from pathlib import Path + +import pandas as pd +import polars as pl +from queries import q4 + +pd.options.mode.copy_on_write = True +pd.options.future.infer_string = True + +line_item = Path("data") / "lineitem.parquet" +orders = Path("data") / "orders.parquet" + +IO_FUNCS = { + "pandas": lambda x: pd.read_parquet(x, engine="pyarrow"), + "pandas[pyarrow]": lambda x: pd.read_parquet( + x, engine="pyarrow", dtype_backend="pyarrow" + ), + "polars[eager]": lambda x: pl.read_parquet(x), + "polars[lazy]": lambda x: pl.scan_parquet(x), +} + +tool = "pandas" +fn = IO_FUNCS[tool] +print(q4.query(fn(line_item), fn(orders))) + +tool = "pandas[pyarrow]" +fn = IO_FUNCS[tool] +print(q4.query(fn(line_item), fn(orders))) + +tool = "polars[eager]" +fn = IO_FUNCS[tool] +print(q4.query(fn(line_item), fn(orders))) + +tool = "polars[lazy]" +fn = IO_FUNCS[tool] +print(q4.query(fn(line_item), fn(orders)).collect()) diff --git a/tpch/queries/q4.py b/tpch/queries/q4.py new file mode 100644 index 000000000..7247004fe --- /dev/null +++ b/tpch/queries/q4.py @@ -0,0 +1,31 @@ +from datetime import datetime + +import narwhals as nw +from narwhals.typing import FrameT + + +@nw.narwhalify +def query( + line_item_ds_raw: FrameT, + orders_ds_raw: FrameT, +) -> FrameT: + var_1 = datetime(1993, 7, 1) + var_2 = datetime(1993, 10, 1) + + line_item_ds = nw.from_native(line_item_ds_raw) + orders_ds = nw.from_native(orders_ds_raw) + + result = ( + line_item_ds.join(orders_ds, left_on="l_orderkey", right_on="o_orderkey") + .filter( + nw.col("o_orderdate").is_between(var_1, var_2, closed="left"), + nw.col("l_commitdate") < nw.col("l_receiptdate"), + ) + .unique(subset=["o_orderpriority", "l_orderkey"]) + .group_by("o_orderpriority") + .agg(nw.len().alias("order_count")) + .sort(by="o_orderpriority") + .with_columns(nw.col("order_count").cast(nw.Int64)) + ) + + return nw.to_native(result)