Skip to content

Commit d0b514b

Browse files
authored
Merge pull request #366 from epoch8/deprecate-py39
Deprecate python3.9
2 parents f078e11 + 5f9e058 commit d0b514b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+444
-528
lines changed

.github/workflows/test.yaml

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,9 @@ jobs:
2626
strategy:
2727
matrix:
2828
include:
29-
# - python-version: "3.8"
30-
# test-db-env: "postgres"
31-
# pip-extra: "sqlalchemy <2"
32-
# - python-version: "3.8"
33-
# test-db-env: "postgres"
34-
# pip-extra: "'sqlalchemy>2'"
35-
# - python-version: "3.8"
36-
# test-db-env: "sqlite"
37-
38-
- python-version: "3.9"
29+
- python-version: "3.10"
3930
test-db-env: "postgres"
4031
pip-extra: '"sqlalchemy>2"'
41-
# - python-version: "3.9"
42-
# test-db-env: "sqlite"
43-
44-
# - python-version: "3.10"
45-
# test-db-env: "postgres"
4632
# - python-version: "3.10"
4733
# test-db-env: "sqlite"
4834

.github/workflows/test_examples.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ jobs:
2525
strategy:
2626
matrix:
2727
python-version:
28-
# - "3.8"
29-
# - "3.9"
3028
# - "3.10"
3129
# - "3.11"
3230
- "3.12"

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ See [key-mapping.md](design-docs/2025-12-key-mapping.md) for motivation
2323
joining tables with different key names
2424
* Added `DataField` accessor for `InputSpec.keys`
2525

26+
### Python3.9 support is deprecated
27+
2628
## CLI improvements:
2729
* Make CLI accept multiple `--name` values
2830

datapipe/cli.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import importlib.metadata as metadata
2-
import os.path
2+
import os
33
import sys
44
import time
5-
from typing import Dict, List, Optional, cast
5+
from typing import cast
66

77
import click
88
import pandas as pd
@@ -49,7 +49,7 @@ def load_pipeline(pipeline_name: str) -> DatapipeApp:
4949
return app
5050

5151

52-
def parse_labels(labels: Optional[str]) -> Labels:
52+
def parse_labels(labels: str | None) -> Labels:
5353
if labels is None or labels == "":
5454
return []
5555

@@ -68,8 +68,8 @@ def parse_labels(labels: Optional[str]) -> Labels:
6868
def filter_steps_by_labels_and_name(
6969
app: DatapipeApp,
7070
labels: Labels = [],
71-
name_prefix: Optional[str] = None,
72-
) -> List[ComputeStep]:
71+
name_prefix: str | None = None,
72+
) -> list[ComputeStep]:
7373
res = []
7474

7575
name_prefixes = None if name_prefix is None else [p.strip() for p in name_prefix.split(",")]
@@ -311,7 +311,7 @@ def step(
311311
ctx.obj["steps"] = steps
312312

313313

314-
def to_human_repr(step: ComputeStep, extra_args: Optional[Dict] = None) -> str:
314+
def to_human_repr(step: ComputeStep, extra_args: dict | None = None) -> str:
315315
res = []
316316

317317
res.append(f"[green][bold]{step.name}[/bold][/green] ({step.__class__.__name__})")
@@ -340,7 +340,7 @@ def to_human_repr(step: ComputeStep, extra_args: Optional[Dict] = None) -> str:
340340
@click.pass_context
341341
def step_list(ctx: click.Context, status: bool) -> None: # noqa
342342
app: DatapipeApp = ctx.obj["pipeline"]
343-
steps: List[ComputeStep] = ctx.obj["steps"]
343+
steps: list[ComputeStep] = ctx.obj["steps"]
344344

345345
for step in steps:
346346
extra_args = {}
@@ -365,7 +365,7 @@ def step_list(ctx: click.Context, status: bool) -> None: # noqa
365365
@click.pass_context
366366
def step_run(ctx: click.Context, loop: bool, loop_delay: int) -> None:
367367
app: DatapipeApp = ctx.obj["pipeline"]
368-
steps_to_run: List[ComputeStep] = ctx.obj["steps"]
368+
steps_to_run: list[ComputeStep] = ctx.obj["steps"]
369369

370370
executor: Executor = ctx.obj["executor"]
371371

@@ -389,7 +389,7 @@ def step_run(ctx: click.Context, loop: bool, loop_delay: int) -> None:
389389
@click.pass_context
390390
def run_idx(ctx: click.Context, idx: str) -> None:
391391
app: DatapipeApp = ctx.obj["pipeline"]
392-
steps_to_run: List[ComputeStep] = ctx.obj["steps"]
392+
steps_to_run: list[ComputeStep] = ctx.obj["steps"]
393393
steps_to_run_names = [f"'{i.name}'" for i in steps_to_run]
394394
print(f"Running following steps: {', '.join(steps_to_run_names)}")
395395

@@ -411,11 +411,11 @@ def run_changelist(
411411
start_step: str,
412412
loop: bool,
413413
loop_delay: int,
414-
chunk_size: Optional[int] = None,
414+
chunk_size: int | None = None,
415415
) -> None:
416416
app: DatapipeApp = ctx.obj["pipeline"]
417417

418-
steps_to_run: List[ComputeStep] = ctx.obj["steps"]
418+
steps_to_run: list[ComputeStep] = ctx.obj["steps"]
419419
if start_step is not None:
420420
start_step_objs = filter_steps_by_labels_and_name(app, labels=[], name_prefix=start_step)
421421
assert len(start_step_objs) == 1
@@ -465,7 +465,7 @@ def run_changelist(
465465
@click.pass_context
466466
def fill_metadata(ctx: click.Context) -> None:
467467
app: DatapipeApp = ctx.obj["pipeline"]
468-
steps_to_run: List[ComputeStep] = ctx.obj["steps"]
468+
steps_to_run: list[ComputeStep] = ctx.obj["steps"]
469469
steps_to_run_names = [f"'{i.name}'" for i in steps_to_run]
470470

471471
for step in steps_to_run:
@@ -478,7 +478,7 @@ def fill_metadata(ctx: click.Context) -> None:
478478
@click.pass_context
479479
def reset_metadata(ctx: click.Context) -> None: # noqa
480480
app: DatapipeApp = ctx.obj["pipeline"]
481-
steps_to_run: List[ComputeStep] = ctx.obj["steps"]
481+
steps_to_run: list[ComputeStep] = ctx.obj["steps"]
482482
steps_to_run_names = [f"'{i.name}'" for i in steps_to_run]
483483
print(f"Resetting following steps: {', '.join(steps_to_run_names)}")
484484

datapipe/compute.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
from abc import ABC, abstractmethod
44
from dataclasses import dataclass
5-
from typing import Dict, Iterable, List, Literal, Optional, Sequence, Tuple, Union
5+
from typing import Iterable, Literal, Sequence
66

77
from opentelemetry import trace
88
from sqlalchemy import Column
@@ -21,13 +21,13 @@
2121
@dataclass
2222
class Table:
2323
store: TableStore
24-
name: Optional[str] = None
24+
name: str | None = None
2525

2626

2727
class Catalog:
28-
def __init__(self, catalog: Dict[str, Table]):
28+
def __init__(self, catalog: dict[str, Table]):
2929
self.catalog = catalog
30-
self.data_tables: Dict[str, DataTable] = {}
30+
self.data_tables: dict[str, DataTable] = {}
3131

3232
def add_datatable(self, name: str, dt: Table):
3333
self.catalog[name] = dt
@@ -94,10 +94,10 @@ class ComputeInput:
9494
#
9595
# Example: {"idx_col": "meta_col"} means that to get idx_col value
9696
# we should read meta_col from meta table
97-
keys: Optional[Dict[str, FieldAccessor]] = None
97+
keys: dict[str, FieldAccessor] | None = None
9898

9999
@property
100-
def primary_keys(self) -> List[str]:
100+
def primary_keys(self) -> list[str]:
101101
if self.keys:
102102
return list(self.keys.keys())
103103
else:
@@ -152,10 +152,10 @@ class ComputeStep:
152152
def __init__(
153153
self,
154154
name: str,
155-
input_dts: Sequence[Union[ComputeInput, DataTable]],
156-
output_dts: List[DataTable],
157-
labels: Optional[Labels] = None,
158-
executor_config: Optional[ExecutorConfig] = None,
155+
input_dts: Sequence[ComputeInput | DataTable],
156+
output_dts: list[DataTable],
157+
labels: Labels | None = None,
158+
executor_config: ExecutorConfig | None = None,
159159
) -> None:
160160
self._name = name
161161
# Нормализация input_dts: автоматически оборачиваем DataTable в ComputeInput
@@ -222,41 +222,41 @@ def validate(self) -> None:
222222
def get_full_process_ids(
223223
self,
224224
ds: DataStore,
225-
chunk_size: Optional[int] = None,
226-
run_config: Optional[RunConfig] = None,
227-
) -> Tuple[int, Iterable[IndexDF]]:
225+
chunk_size: int | None = None,
226+
run_config: RunConfig | None = None,
227+
) -> tuple[int, Iterable[IndexDF]]:
228228
raise NotImplementedError()
229229

230230
def get_change_list_process_ids(
231231
self,
232232
ds: DataStore,
233233
change_list: ChangeList,
234-
run_config: Optional[RunConfig] = None,
235-
) -> Tuple[int, Iterable[IndexDF]]:
234+
run_config: RunConfig | None = None,
235+
) -> tuple[int, Iterable[IndexDF]]:
236236
raise NotImplementedError()
237237

238238
def run_full(
239239
self,
240240
ds: DataStore,
241-
run_config: Optional[RunConfig] = None,
242-
executor: Optional[Executor] = None,
241+
run_config: RunConfig | None = None,
242+
executor: Executor | None = None,
243243
) -> None:
244244
raise NotImplementedError()
245245

246246
def run_changelist(
247247
self,
248248
ds: DataStore,
249249
change_list: ChangeList,
250-
run_config: Optional[RunConfig] = None,
251-
executor: Optional[Executor] = None,
250+
run_config: RunConfig | None = None,
251+
executor: Executor | None = None,
252252
) -> ChangeList:
253253
raise NotImplementedError()
254254

255255
def run_idx(
256256
self,
257257
ds: DataStore,
258258
idx: IndexDF,
259-
run_config: Optional[RunConfig] = None,
259+
run_config: RunConfig | None = None,
260260
) -> ChangeList:
261261
raise NotImplementedError()
262262

@@ -270,7 +270,7 @@ class PipelineStep(ABC):
270270
"""
271271

272272
@abstractmethod
273-
def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]:
273+
def build_compute(self, ds: DataStore, catalog: Catalog) -> list[ComputeStep]:
274274
raise NotImplementedError
275275

276276

@@ -288,11 +288,11 @@ def __init__(self, ds: DataStore, catalog: Catalog, pipeline: Pipeline):
288288
self.steps = build_compute(ds, catalog, pipeline)
289289

290290

291-
def build_compute(ds: DataStore, catalog: Catalog, pipeline: Pipeline) -> List[ComputeStep]:
291+
def build_compute(ds: DataStore, catalog: Catalog, pipeline: Pipeline) -> list[ComputeStep]:
292292
with tracer.start_as_current_span("build_compute"):
293293
catalog.init_all_tables(ds)
294294

295-
compute_pipeline: List[ComputeStep] = []
295+
compute_pipeline: list[ComputeStep] = []
296296

297297
for step in pipeline.steps:
298298
compute_pipeline.extend(step.build_compute(ds, catalog))
@@ -304,7 +304,7 @@ def build_compute(ds: DataStore, catalog: Catalog, pipeline: Pipeline) -> List[C
304304
return compute_pipeline
305305

306306

307-
def print_compute(steps: List[ComputeStep]) -> None:
307+
def print_compute(steps: list[ComputeStep]) -> None:
308308
import pprint
309309

310310
pprint.pp(steps)
@@ -313,8 +313,8 @@ def print_compute(steps: List[ComputeStep]) -> None:
313313
def run_steps(
314314
ds: DataStore,
315315
steps: Sequence[ComputeStep],
316-
run_config: Optional[RunConfig] = None,
317-
executor: Optional[Executor] = None,
316+
run_config: RunConfig | None = None,
317+
executor: Executor | None = None,
318318
) -> None:
319319
for step in steps:
320320
with tracer.start_as_current_span(
@@ -332,7 +332,7 @@ def run_pipeline(
332332
ds: DataStore,
333333
catalog: Catalog,
334334
pipeline: Pipeline,
335-
run_config: Optional[RunConfig] = None,
335+
run_config: RunConfig | None = None,
336336
) -> None:
337337
steps = build_compute(ds, catalog, pipeline)
338338
run_steps(ds, steps, run_config)
@@ -343,7 +343,7 @@ def run_changelist(
343343
catalog: Catalog,
344344
pipeline: Pipeline,
345345
changelist: ChangeList,
346-
run_config: Optional[RunConfig] = None,
346+
run_config: RunConfig | None = None,
347347
) -> None:
348348
steps = build_compute(ds, catalog, pipeline)
349349

@@ -352,10 +352,10 @@ def run_changelist(
352352

353353
def run_steps_changelist(
354354
ds: DataStore,
355-
steps: List[ComputeStep],
355+
steps: list[ComputeStep],
356356
changelist: ChangeList,
357-
run_config: Optional[RunConfig] = None,
358-
executor: Optional[Executor] = None,
357+
run_config: RunConfig | None = None,
358+
executor: Executor | None = None,
359359
) -> None:
360360
# FIXME extract Batch* steps to separate module
361361
from datapipe.step.batch_transform import BaseBatchTransformStep

datapipe/datatable.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import TYPE_CHECKING, Any, Dict, Optional, cast
2+
from typing import TYPE_CHECKING, Any, cast
33

44
import pandas as pd
55
from opentelemetry import trace
@@ -38,10 +38,10 @@ def __init__(
3838
self.primary_schema = meta.primary_schema
3939
self.primary_keys = meta.primary_keys
4040

41-
def get_metadata(self, idx: Optional[IndexDF] = None) -> MetadataDF:
41+
def get_metadata(self, idx: IndexDF | None = None) -> MetadataDF:
4242
return self.meta.get_metadata(idx)
4343

44-
def get_data(self, idx: Optional[IndexDF] = None) -> DataDF:
44+
def get_data(self, idx: IndexDF | None = None) -> DataDF:
4545
return self.table_store.read_rows(self.meta.get_existing_idx(idx))
4646

4747
def reset_metadata(self):
@@ -56,9 +56,9 @@ def get_size(self) -> int:
5656
def store_chunk(
5757
self,
5858
data_df: DataDF,
59-
processed_idx: Optional[IndexDF] = None,
60-
now: Optional[float] = None,
61-
run_config: Optional[RunConfig] = None,
59+
processed_idx: IndexDF | None = None,
60+
now: float | None = None,
61+
run_config: RunConfig | None = None,
6262
) -> IndexDF:
6363
"""
6464
Записать новые данные в таблицу.
@@ -134,8 +134,8 @@ def store_chunk(
134134
def delete_by_idx(
135135
self,
136136
idx: IndexDF,
137-
now: Optional[float] = None,
138-
run_config: Optional[RunConfig] = None,
137+
now: float | None = None,
138+
run_config: RunConfig | None = None,
139139
) -> None:
140140
if len(idx) > 0:
141141
logger.debug(f"Deleting {len(idx.index)} rows from {self.name} data")
@@ -146,8 +146,8 @@ def delete_by_idx(
146146
def delete_stale_by_process_ts(
147147
self,
148148
process_ts: float,
149-
now: Optional[float] = None,
150-
run_config: Optional[RunConfig] = None,
149+
now: float | None = None,
150+
run_config: RunConfig | None = None,
151151
) -> None:
152152
for deleted_df in self.meta.get_stale_idx(process_ts, run_config=run_config):
153153
deleted_idx = data_to_index(deleted_df, self.primary_keys)
@@ -165,7 +165,7 @@ def __init__(
165165

166166
self.meta_dbconn = meta_dbconn
167167
self.event_logger = EventLogger()
168-
self.tables: Dict[str, DataTable] = {}
168+
self.tables: dict[str, DataTable] = {}
169169

170170
# TODO move initialization outside
171171
self.meta_plane = SQLMetaPlane(dbconn=meta_dbconn, create_meta_table=create_meta_table)

0 commit comments

Comments
 (0)