Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to support python 3.11 #380

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10"]
python-version: ["3.11"]
os: [macos-latest, windows-latest]
steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -106,7 +106,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10"]
python-version: ["3.11"]
steps:
- uses: actions/checkout@v3

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
with:
miniforge-variant: Mambaforge
use-mamba: true
python-version: "3.8"
python-version: "3.11"
channel-priority: strict
- name: Install dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"
- name: Audit licenses
run: ./dev/release/run-rat.sh .
8 changes: 4 additions & 4 deletions .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"

- name: Install Protoc
uses: arduino/setup-protoc@v1
with:
version: '3.x'
version: "3.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}

- name: Install dependencies
run: |
set -x
python3 -m venv venv
source venv/bin/activate
pip install -r requirements-310.txt
pip install -r docs/requirements.txt
pip install --require-hashes --no-deps -r requirements.txt
pip install --require-hashes --no-deps -r docs/requirements.txt
- name: Build Datafusion
run: |
set -x
Expand Down
30 changes: 16 additions & 14 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ jobs:
fail-fast: false
matrix:
python-version:
- "3.7"
- "3.8"
- "3.9"
- "3.10"
- "3.11"
toolchain:
- "stable"
# we are not that much eager in walking on the edge yet
# - nightly
# build stable for only 3.7
include:
- python-version: "3.7"
toolchain: "stable"
steps:
- uses: actions/checkout@v3

Expand All @@ -55,7 +53,7 @@ jobs:
- name: Install Protoc
uses: arduino/setup-protoc@v1
with:
version: '3.x'
version: "3.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}

- name: Setup Python
Expand All @@ -71,34 +69,38 @@ jobs:

- name: Check Formatting
uses: actions-rs/cargo@v1
if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
if: ${{ matrix.python-version == '3.11' && matrix.toolchain == 'stable' }}
with:
command: fmt
args: -- --check

- name: Run Clippy
uses: actions-rs/cargo@v1
if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
if: ${{ matrix.python-version == '3.11' && matrix.toolchain == 'stable' }}
with:
command: clippy
args: --all-targets --all-features -- -D clippy::all -A clippy::redundant_closure

- name: Create Virtualenv (3.10)
if: ${{ matrix.python-version == '3.10' }}
- name: Create Virtualenv (>= 3.8)
if: ${{ matrix.python-version != '3.7' }}
run: |
python -m venv venv
source venv/bin/activate
pip install -r requirements-310.txt
pip install -U pip
# only required on versions < 3.11 because of Pytest 7
pip install 'exceptiongroup>=1.0.0rc8;python_version<"3.11"'
pip install --require-hashes --no-deps -r requirements.txt

- name: Create Virtualenv (3.7)
if: ${{ matrix.python-version == '3.7' }}
run: |
python -m venv venv
source venv/bin/activate
pip install -r requirements-37.txt
pip install -U pip
pip install --require-hashes --no-deps -r requirements-37.txt

- name: Run Python Linters
if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
if: ${{ matrix.python-version == '3.11' && matrix.toolchain == 'stable' }}
run: |
source venv/bin/activate
flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ source venv/bin/activate
# update pip itself if necessary
python -m pip install -U pip
# install dependencies (for Python 3.8+)
python -m pip install -r requirements-310.txt
python -m pip install --require-hashes --no-deps -r requirements.txt
```

The tests rely on test data in git submodules.
Expand Down Expand Up @@ -235,13 +235,13 @@ To change test dependencies, change the `requirements.in` and run
```bash
# install pip-tools (this can be done only once), also consider running in venv
python -m pip install pip-tools
python -m piptools compile --generate-hashes -o requirements-310.txt
python -m piptools compile --generate-hashes -o requirements.txt
```

To update dependencies, run with `-U`

```bash
python -m piptools compile -U --generate-hashes -o requirements-310.txt
python -m piptools compile -U --generate-hashes -o requirements.txt
```

More details [here](https://github.com/jazzband/pip-tools)
16 changes: 4 additions & 12 deletions benchmarks/db-benchmark/groupby-datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,13 @@ def execute(df):

data = pacsv.read_csv(
src_grp,
convert_options=pacsv.ConvertOptions(
auto_dict_encode=True, column_types=schema
),
convert_options=pacsv.ConvertOptions(auto_dict_encode=True, column_types=schema),
)
print("dataset loaded")

# create a session context with explicit runtime and config settings
runtime = (
RuntimeConfig()
.with_disk_manager_os()
.with_fair_spill_pool(64 * 1024 * 1024 * 1024)
RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(64 * 1024 * 1024 * 1024)
)
config = (
SessionConfig()
Expand All @@ -116,9 +112,7 @@ def execute(df):
if sql:
df = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1")
else:
df = ctx.table("x").aggregate(
[f.col("id1")], [f.sum(f.col("v1")).alias("v1")]
)
df = ctx.table("x").aggregate([f.col("id1")], [f.sum(f.col("v1")).alias("v1")])
ans = execute(df)

shape = ans_shape(ans)
Expand Down Expand Up @@ -197,9 +191,7 @@ def execute(df):
gc.collect()
t_start = timeit.default_timer()
if sql:
df = ctx.sql(
"SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3"
)
df = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3")
else:
df = ctx.table("x").aggregate(
[f.col("id3")],
Expand Down
24 changes: 4 additions & 20 deletions benchmarks/db-benchmark/join-datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ def ans_shape(batches):
print(f"q2: {t}")
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))])
.collect()[0]
.column(0)[0]
)
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
m = memory_usage()
write_log(
Expand Down Expand Up @@ -193,11 +189,7 @@ def ans_shape(batches):
print(f"q3: {t}")
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))])
.collect()[0]
.column(0)[0]
)
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
m = memory_usage()
write_log(
Expand Down Expand Up @@ -234,11 +226,7 @@ def ans_shape(batches):
print(f"q4: {t}")
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))])
.collect()[0]
.column(0)[0]
)
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
m = memory_usage()
write_log(
Expand Down Expand Up @@ -275,11 +263,7 @@ def ans_shape(batches):
print(f"q5: {t}")
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))])
.collect()[0]
.column(0)[0]
)
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
m = memory_usage()
write_log(
Expand Down
4 changes: 1 addition & 3 deletions benchmarks/tpch/tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ def bench(data_path, query_path):
time_millis = (end - start) * 1000
total_time_millis += time_millis
print("q{},{}".format(query, round(time_millis, 1)))
results.write(
"q{},{}\n".format(query, round(time_millis, 1))
)
results.write("q{},{}\n".format(query, round(time_millis, 1)))
results.flush()
except Exception as e:
print("query", query, "failed", e)
Expand Down
4 changes: 1 addition & 3 deletions datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,7 @@ def udaf(accum, input_type, return_type, state_type, volatility, name=None):
Create a new User Defined Aggregate Function
"""
if not issubclass(accum, Accumulator):
raise TypeError(
"`accum` must implement the abstract base class Accumulator"
)
raise TypeError("`accum` must implement the abstract base class Accumulator")
if name is None:
name = accum.__qualname__.lower()
return AggregateUDF(
Expand Down
4 changes: 1 addition & 3 deletions datafusion/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ def to_cudf_df(self, plan):
elif isinstance(node, TableScan):
return cudf.read_parquet(self.parquet_tables[node.table_name()])
else:
raise Exception(
"unsupported logical operator: {}".format(type(node))
)
raise Exception("unsupported logical operator: {}".format(type(node)))

def create_schema(self, schema_name: str, **kwargs):
logger.debug(f"Creating schema: {schema_name}")
Expand Down
8 changes: 2 additions & 6 deletions datafusion/input/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@ class BaseInputSource(ABC):
"""

@abstractmethod
def is_correct_input(
self, input_item: Any, table_name: str, **kwargs
) -> bool:
def is_correct_input(self, input_item: Any, table_name: str, **kwargs) -> bool:
pass

@abstractmethod
def build_table(
self, input_item: Any, table_name: str, **kwarg
) -> SqlTable:
def build_table(self, input_item: Any, table_name: str, **kwarg) -> SqlTable:
pass
4 changes: 1 addition & 3 deletions datafusion/input/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ def build_table(
for _ in reader:
num_rows += 1
# TODO: Need to actually consume this row into resonable columns
raise RuntimeError(
"TODO: Currently unable to support CSV input files."
)
raise RuntimeError("TODO: Currently unable to support CSV input files.")
else:
raise RuntimeError(
f"Input of format: `{format}` is currently not supported.\
Expand Down
4 changes: 1 addition & 3 deletions datafusion/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ def to_pandas_df(self, plan):
elif isinstance(node, TableScan):
return pd.read_parquet(self.parquet_tables[node.table_name()])
else:
raise Exception(
"unsupported logical operator: {}".format(type(node))
)
raise Exception("unsupported logical operator: {}".format(type(node)))

def create_schema(self, schema_name: str, **kwargs):
logger.debug(f"Creating schema: {schema_name}")
Expand Down
12 changes: 3 additions & 9 deletions datafusion/polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ def to_polars_df(self, plan):
args = [self.to_polars_expr(expr) for expr in node.projections()]
return inputs[0].select(*args)
elif isinstance(node, Aggregate):
groupby_expr = [
self.to_polars_expr(expr) for expr in node.group_by_exprs()
]
groupby_expr = [self.to_polars_expr(expr) for expr in node.group_by_exprs()]
aggs = []
for expr in node.aggregate_exprs():
expr = expr.to_variant()
Expand All @@ -67,17 +65,13 @@ def to_polars_df(self, plan):
)
)
else:
raise Exception(
"Unsupported aggregate function {}".format(expr)
)
raise Exception("Unsupported aggregate function {}".format(expr))
df = inputs[0].groupby(groupby_expr).agg(aggs)
return df
elif isinstance(node, TableScan):
return polars.read_parquet(self.parquet_tables[node.table_name()])
else:
raise Exception(
"unsupported logical operator: {}".format(type(node))
)
raise Exception("unsupported logical operator: {}".format(type(node)))

def create_schema(self, schema_name: str, **kwargs):
logger.debug(f"Creating schema: {schema_name}")
Expand Down
12 changes: 3 additions & 9 deletions datafusion/tests/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ def data_datetime(f):
datetime.datetime.now() - datetime.timedelta(days=1),
datetime.datetime.now() + datetime.timedelta(days=1),
]
return pa.array(
data, type=pa.timestamp(f), mask=np.array([False, True, False])
)
return pa.array(data, type=pa.timestamp(f), mask=np.array([False, True, False]))


def data_date32():
Expand All @@ -61,9 +59,7 @@ def data_date32():
datetime.date(1980, 1, 1),
datetime.date(2030, 1, 1),
]
return pa.array(
data, type=pa.date32(), mask=np.array([False, True, False])
)
return pa.array(data, type=pa.date32(), mask=np.array([False, True, False]))


def data_timedelta(f):
Expand All @@ -72,9 +68,7 @@ def data_timedelta(f):
datetime.timedelta(days=1),
datetime.timedelta(seconds=1),
]
return pa.array(
data, type=pa.duration(f), mask=np.array([False, True, False])
)
return pa.array(data, type=pa.duration(f), mask=np.array([False, True, False]))


def data_binary_other():
Expand Down
Loading
Loading