Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
281bf5b
Phase 1: Add TransformInputOffsetTable infrastructure
halconel Oct 8, 2025
d23151e
Phase 2: Implement offset-based query optimization with runtime switc…
halconel Oct 8, 2025
926c00a
Phase 3: Add automatic offset updates after successful processing
halconel Oct 8, 2025
1ddef28
Add safe error handling for missing offset table
halconel Oct 8, 2025
5c8968e
Phase 4: Add offset initialization and fix delete tracking
halconel Oct 9, 2025
753df1d
Phase 5: Add performance tests with scalability analysis
halconel Oct 10, 2025
f6e5cc0
Fix CI errors: type annotations, SQL compatibility, and test assertions
halconel Oct 10, 2025
1e711ca
Refactor init_offsets docstring
halconel Oct 10, 2025
565f10d
Refactor optimization flag checking into helper methods
halconel Oct 10, 2025
979220b
Fix race condition in offset updates with atomic max operation
halconel Oct 10, 2025
fae5b72
Merge remote-tracking branch 'origin/master' into pr/halconel/356
elephantum Oct 15, 2025
93f2685
[Looky-7769] fix: pandas merge performace by filttered join
halconel Oct 27, 2025
ed77858
[Looky-7769] fix: include join_keys columns in idx for filtered join …
halconel Oct 27, 2025
497adfa
[Looky-7769] feat: add comprehensive tests for multi-table filtered j…
halconel Oct 28, 2025
99353dc
[Looky-7769] fix: join with data-table to reach additional_columns
halconel Oct 28, 2025
95a2341
[Looky-7769] fix: implement reverse join for reference tables in filt…
halconel Oct 28, 2025
a45b918
[Looky-7769] fix: add type annotation for error_select_cols in sql_meta
halconel Oct 28, 2025
183a66a
[Looky-7769] fix: create offsets for JoinSpec tables with join_keys d…
halconel Oct 29, 2025
f794999
[Looky-7769] feat: add test for three-table filtered join with v1 vs …
halconel Oct 29, 2025
47f7ec6
Merge pull request #3 from halconel/Looky-7769/offsets-hybrid
halconel Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions datapipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,73 @@ def migrate_transform_tables(ctx: click.Context, labels: str, name: str) -> None
return migrations_v013.migrate_transform_tables(app, batch_transforms_steps)


@cli.command()
@click.option("--step", type=click.STRING, help="Step name to initialize offsets for (optional)")
@click.pass_context
def init_offsets(ctx, step: Optional[str]):
"""
Инициализировать таблицу offset'ов из существующих данных TransformMetaTable.

Команда сканирует уже обработанные данные и устанавливает начальные значения offset'ов,
чтобы обеспечить плавную миграцию на оптимизацию через offset'ы (метод v2).

Если указан --step, инициализирует только этот шаг. Иначе инициализирует
все экземпляры BatchTransformStep в пайплайне.
"""
from datapipe.meta.sql_meta import initialize_offsets_from_transform_meta

app: DatapipeApp = ctx.obj["app"]

# Collect all BatchTransformStep instances
transform_steps = []
for compute_step in app.steps:
if isinstance(compute_step, BaseBatchTransformStep):
if step is None or compute_step.get_name() == step:
transform_steps.append(compute_step)

if not transform_steps:
if step:
rprint(f"[red]Step '{step}' not found or is not a BatchTransformStep[/red]")
else:
rprint("[yellow]No BatchTransformStep instances found in pipeline[/yellow]")
return

rprint(f"[cyan]Found {len(transform_steps)} transform step(s) to initialize[/cyan]")

# Initialize offsets for each step
results = {}
for transform_step in transform_steps:
step_name = transform_step.get_name()
rprint(f"\n[cyan]Initializing offsets for: {step_name}[/cyan]")

try:
offsets = initialize_offsets_from_transform_meta(app.ds, transform_step)

if offsets:
rprint(f"[green]✓ Initialized {len(offsets)} offset(s):[/green]")
for input_name, offset_value in offsets.items():
rprint(f" - {input_name}: {offset_value}")
results[step_name] = offsets
else:
rprint("[yellow]No offsets initialized (no processed data found)[/yellow]")
results[step_name] = {}

except Exception as e:
rprint(f"[red]✗ Failed to initialize: {e}[/red]")
results[step_name] = {}

# Summary
rprint("\n[cyan]═══ Summary ═══[/cyan]")
success_count = sum(1 for v in results.values() if v is not None and len(v) > 0)
empty_count = sum(1 for v in results.values() if v is not None and len(v) == 0)
failed_count = sum(1 for v in results.values() if v is None)

rprint(f"[green]Successful: {success_count}[/green]")
rprint(f"[yellow]Empty (no data): {empty_count}[/yellow]")
if failed_count > 0:
rprint(f"[red]Failed: {failed_count}[/red]")


try:
entry_points = metadata.entry_points(group="datapipe.cli") # type: ignore
except TypeError:
Expand Down
3 changes: 3 additions & 0 deletions datapipe/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class StepStatus:
class ComputeInput:
dt: DataTable
join_type: Literal["inner", "full"] = "full"
# Filtered join optimization: mapping from idx columns to dt columns
# Example: {"user_id": "id"} means filter dt by dt.id IN (idx.user_id)
join_keys: Optional[Dict[str, str]] = None


class ComputeStep:
Expand Down
5 changes: 4 additions & 1 deletion datapipe/datatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opentelemetry import trace

from datapipe.event_logger import EventLogger
from datapipe.meta.sql_meta import MetaTable
from datapipe.meta.sql_meta import MetaTable, TransformInputOffsetTable
from datapipe.run_config import RunConfig
from datapipe.store.database import DBConn
from datapipe.store.table_store import TableStore
Expand Down Expand Up @@ -165,6 +165,9 @@ def __init__(

self.create_meta_table = create_meta_table

# Создать таблицу offset'ов (используем тот же флаг create_meta_table)
self.offset_table = TransformInputOffsetTable(meta_dbconn, create_table=create_meta_table)

def create_table(self, name: str, table_store: TableStore) -> DataTable:
assert name not in self.tables

Expand Down
Loading