Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
281bf5b
Phase 1: Add TransformInputOffsetTable infrastructure
halconel Oct 8, 2025
d23151e
Phase 2: Implement offset-based query optimization with runtime switc…
halconel Oct 8, 2025
926c00a
Phase 3: Add automatic offset updates after successful processing
halconel Oct 8, 2025
1ddef28
Add safe error handling for missing offset table
halconel Oct 8, 2025
5c8968e
Phase 4: Add offset initialization and fix delete tracking
halconel Oct 9, 2025
753df1d
Phase 5: Add performance tests with scalability analysis
halconel Oct 10, 2025
f6e5cc0
Fix CI errors: type annotations, SQL compatibility, and test assertions
halconel Oct 10, 2025
1e711ca
Refactor init_offsets docstring
halconel Oct 10, 2025
565f10d
Refactor optimization flag checking into helper methods
halconel Oct 10, 2025
979220b
Fix race condition in offset updates with atomic max operation
halconel Oct 10, 2025
fae5b72
Merge remote-tracking branch 'origin/master' into pr/halconel/356
elephantum Oct 15, 2025
93f2685
[Looky-7769] fix: pandas merge performace by filttered join
halconel Oct 27, 2025
ed77858
[Looky-7769] fix: include join_keys columns in idx for filtered join …
halconel Oct 27, 2025
497adfa
[Looky-7769] feat: add comprehensive tests for multi-table filtered j…
halconel Oct 28, 2025
99353dc
[Looky-7769] fix: join with data-table to reach additional_columns
halconel Oct 28, 2025
95a2341
[Looky-7769] fix: implement reverse join for reference tables in filt…
halconel Oct 28, 2025
a45b918
[Looky-7769] fix: add type annotation for error_select_cols in sql_meta
halconel Oct 28, 2025
183a66a
[Looky-7769] fix: create offsets for JoinSpec tables with join_keys d…
halconel Oct 29, 2025
f794999
[Looky-7769] feat: add test for three-table filtered join with v1 vs …
halconel Oct 29, 2025
47f7ec6
Merge pull request #3 from halconel/Looky-7769/offsets-hybrid
halconel Nov 20, 2025
da879f2
source /home/elephantum/Epoch8/Datapipe/datapipe/.venv/bin/activate
elephantum Nov 25, 2025
8b76fd5
Add use_offset_optimization field to BatchTransform and DatatableBatc…
halconel Dec 2, 2025
f258a5b
[Looky-7769] fix: add comprehensive test suite and documentation for …
halconel Dec 11, 2025
3d91c56
[Looky-7769] fix: implement ORDER BY update_ts to prevent data loss i…
halconel Dec 11, 2025
7ccbc38
[Looky-7769] fix: change strict inequality to >= and add process_ts f…
halconel Dec 12, 2025
93eb568
[Looky-7769] fix: add warning when store_chunk is called with past ti…
halconel Dec 12, 2025
ab5db8f
[Looky-7769] docs: add offset optimization documentation and remove t…
halconel Dec 12, 2025
6fb128d
[Looky-7769] fix: correct test expectations in offset edge cases tests
halconel Dec 12, 2025
a83cf78
[Looky-7769] fix: literals are restricted in GROUP BY clause
halconel Dec 12, 2025
5a0eeae
[Looky-7769] fix: ensure atomic offset commit at end of run_full by p…
halconel Dec 17, 2025
d93d3bc
[Looky-7769] fix: rename variables in ChangeList.extend to resolve my…
halconel Dec 17, 2025
c297a00
[Looky-7769] fix: increase timing delays in flaky tests and skip conc…
halconel Dec 17, 2025
21a21e9
[Looky-7769] fix: skip custom ordering test for SQLite due to NULLS L…
halconel Dec 17, 2025
c050248
doc: add comprehensive offset optimization documentation
halconel Dec 25, 2025
e299b4b
doc: split offset optimization documentation into separate feature files
halconel Dec 25, 2025
f1d7e57
feat: add unique CTE naming for reusing same table with different joi…
halconel Dec 27, 2025
f89a094
feat: fix deleted records not being removed from output when using jo…
halconel Dec 27, 2025
69d422f
docs: remove offset optimization explanation files that should only e…
halconel Dec 27, 2025
3e9ece4
refactor: simplify verbose comments in offset optimization code
halconel Dec 27, 2025
c5bbc93
feat: add deduplication and cross join support for offset optimizatio…
halconel Dec 27, 2025
63e0cbd
refactor: reorganize offset optimization functions with symmetric nam…
halconel Dec 28, 2025
8743a43
fix: apply epsilon-adjusted offset in WHERE clause to prevent data lo…
halconel Dec 30, 2025
a4a9877
perf: replace OR with UNION in offset WHERE clauses to enable index u…
halconel Dec 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions datapipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,73 @@ def migrate_transform_tables(ctx: click.Context, labels: str, name: str) -> None
return migrations_v013.migrate_transform_tables(app, batch_transforms_steps)


@cli.command()
@click.option("--step", type=click.STRING, help="Step name to initialize offsets for (optional)")
@click.pass_context
def init_offsets(ctx, step: Optional[str]):
"""
Инициализировать таблицу offset'ов из существующих данных TransformMetaTable.

Команда сканирует уже обработанные данные и устанавливает начальные значения offset'ов,
чтобы обеспечить плавную миграцию на оптимизацию через offset'ы (метод v2).

Если указан --step, инициализирует только этот шаг. Иначе инициализирует
все экземпляры BatchTransformStep в пайплайне.
"""
from datapipe.meta.sql_meta import initialize_offsets_from_transform_meta

app: DatapipeApp = ctx.obj["app"]

# Collect all BatchTransformStep instances
transform_steps = []
for compute_step in app.steps:
if isinstance(compute_step, BaseBatchTransformStep):
if step is None or compute_step.get_name() == step:
transform_steps.append(compute_step)

if not transform_steps:
if step:
rprint(f"[red]Step '{step}' not found or is not a BatchTransformStep[/red]")
else:
rprint("[yellow]No BatchTransformStep instances found in pipeline[/yellow]")
return

rprint(f"[cyan]Found {len(transform_steps)} transform step(s) to initialize[/cyan]")

# Initialize offsets for each step
results = {}
for transform_step in transform_steps:
step_name = transform_step.get_name()
rprint(f"\n[cyan]Initializing offsets for: {step_name}[/cyan]")

try:
offsets = initialize_offsets_from_transform_meta(app.ds, transform_step)

if offsets:
rprint(f"[green]✓ Initialized {len(offsets)} offset(s):[/green]")
for input_name, offset_value in offsets.items():
rprint(f" - {input_name}: {offset_value}")
results[step_name] = offsets
else:
rprint("[yellow]No offsets initialized (no processed data found)[/yellow]")
results[step_name] = {}

except Exception as e:
rprint(f"[red]✗ Failed to initialize: {e}[/red]")
results[step_name] = {}

# Summary
rprint("\n[cyan]═══ Summary ═══[/cyan]")
success_count = sum(1 for v in results.values() if v is not None and len(v) > 0)
empty_count = sum(1 for v in results.values() if v is not None and len(v) == 0)
failed_count = sum(1 for v in results.values() if v is None)

rprint(f"[green]Successful: {success_count}[/green]")
rprint(f"[yellow]Empty (no data): {empty_count}[/yellow]")
if failed_count > 0:
rprint(f"[red]Failed: {failed_count}[/red]")


try:
entry_points = metadata.entry_points(group="datapipe.cli") # type: ignore
except TypeError:
Expand Down
3 changes: 3 additions & 0 deletions datapipe/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class StepStatus:
class ComputeInput:
dt: DataTable
join_type: Literal["inner", "full"] = "full"
# Filtered join optimization: mapping from idx columns to dt columns
# Example: {"user_id": "id"} means filter dt by dt.id IN (idx.user_id)
join_keys: Optional[Dict[str, str]] = None


class ComputeStep:
Expand Down
5 changes: 4 additions & 1 deletion datapipe/datatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opentelemetry import trace

from datapipe.event_logger import EventLogger
from datapipe.meta.sql_meta import MetaTable
from datapipe.meta.sql_meta import MetaTable, TransformInputOffsetTable
from datapipe.run_config import RunConfig
from datapipe.store.database import DBConn
from datapipe.store.table_store import TableStore
Expand Down Expand Up @@ -170,6 +170,9 @@ def __init__(

self.create_meta_table = create_meta_table

# Создать таблицу offset'ов (используем тот же флаг create_meta_table)
self.offset_table = TransformInputOffsetTable(meta_dbconn, create_table=create_meta_table)

def create_table(self, name: str, table_store: TableStore) -> DataTable:
assert name not in self.tables

Expand Down
Loading