-
Notifications
You must be signed in to change notification settings - Fork 2
Optimize BatchTransformStep incremental updates with offset-based query optimization #356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Implement offset table for tracking last processed update_ts per transformation and input table. This will enable optimization of changed data detection by replacing FULL OUTER JOIN with simple WHERE update_ts > offset filters. Changes: - Add TransformInputOffsetTable class in datapipe/meta/sql_meta.py with: - Table schema with (transformation_id, input_table_name, update_ts_offset) - CRUD methods: get_offset, update_offset, update_offsets_bulk, reset_offset - Statistics methods: get_statistics, get_offset_count - Automatic table and index creation via create_table=True flag - Integrate offset_table into DataStore.__init__ in datapipe/datatable.py - Add comprehensive unit tests in tests/test_offset_table.py (12 test cases)
…hing Add build_changed_idx_sql_v2 that uses offset filtering instead of FULL OUTER JOIN for finding changed records. Include runtime switching between v1 and v2 methods, performance logging, and comprehensive integration tests. Changes: - Add build_changed_idx_sql_v2() in datapipe/meta/sql_meta.py: - Use WHERE update_ts > offset filter per input table instead of JOIN - UNION changed records from all inputs + error records - Fix N+1 problem with get_offsets_for_transformation() bulk method - LEFT JOIN with transform_meta only for priority/ordering (O(N log M)) - Add use_offset_optimization parameter to batch transform classes: - BaseBatchTransformStep, BatchTransformStep, DatatableBatchTransformStep - Runtime override via RunConfig.labels["use_offset_optimization"] - Add performance monitoring in datapipe/step/batch_transform.py: - Log query build time in _build_changed_idx_sql() - Log query execution time in get_full_process_ids() - OpenTelemetry spans for tracing (v1_join vs v2_offset) - Add integration tests: - tests/test_build_changed_idx_sql_v2.py (5 tests for v2 SQL logic) - tests/test_batch_transform_with_offset_optimization.py (6 end-to-end tests) - tests/test_offset_optimization_runtime_switch.py (5 tests, 2 xfail for Phase 3) - Fix duplicate select import in batch_transform.py Performance: V2 is O(N log M) vs V1's O(M_total) where N=changed records, M=total records. Example: 1000 new / 10M total = 100-1000x faster. Test results: 599 passed, 2 failed (external deps), 3 xfailed Lint: flake8 ✓, mypy ✓
Implement automatic offset table updates in store_batch_result() to track the last processed update_ts for each input table. Offsets are updated for all transformations regardless of use_offset_optimization flag, allowing gradual migration and immediate benefit when optimization is enabled. Changes: - Add _get_max_update_ts_for_batch() in datapipe/step/batch_transform.py: - Query max(update_ts) for successfully processed records only - Use processed_idx from output_dfs, not full input idx - Returns None if no records processed - Update store_batch_result() to auto-update offsets: - Extract processed_idx from output_dfs using data_to_index() - Call _get_max_update_ts_for_batch() for each input table - Bulk update offsets via update_offsets_bulk() in one transaction - Works even when use_offset_optimization=False (prepares for future use) - Fix test_batch_transform_offset_with_error_retry: - Change from partial batch processing to exception-based errors - Add chunk_size=1 to process records individually - Ensures error records are marked as is_success=False, not deleted - Remove xfail markers from 2 runtime switching tests (now passing) - Add comprehensive integration tests in tests/test_offset_auto_update.py: - test_offset_auto_update_integration: Full lifecycle with offset updates - test_offset_update_with_multiple_inputs: Independent offset per input - test_offset_updated_even_when_optimization_disabled: Offsets always update - test_offset_not_updated_on_empty_result: No offset when output empty
Add try-except blocks to gracefully handle cases when offset table doesn't exist (create_meta_table=False). This allows offset optimization to work seamlessly even without the table, falling back to processing all data. Changes: - Add try-except in get_offsets_for_transformation(): - Returns empty dict if table doesn't exist - v2 method treats empty offsets as 0.0 (process all data) - Add try-except in store_batch_result(): - Logs warning if offset update fails due to missing table - Continues execution without breaking the transformation - Add test_works_without_offset_table(): - Simulates missing offset table by dropping it - Verifies transformation completes successfully - Confirms data is processed correctly with v2 method Behavior when offset table missing: - v2 query method: processes all data (offset defaults to 0.0) - Offset updates: logs warning, continues without error - No impact on transformation success
Implement offset initialization from existing TransformMetaTable data and fix critical bug in delete record tracking for offset optimization. Changes: - Add initialize_offsets_from_transform_meta() in datapipe/meta/sql_meta.py: - Conservative approach: MAX(input.update_ts) <= MIN(transform.process_ts) - Bulk update offsets for all input tables - Safe error handling for missing offset table - Returns dict of initialized offsets - Add init_offsets CLI command in datapipe/cli.py: - Supports --step option to initialize specific transform - Initializes all BatchTransformStep instances by default - Rich colored output with success/failure summary - Fix delete tracking in build_changed_idx_sql_v2(): - Changed: WHERE update_ts > offset AND delete_ts IS NULL - To: WHERE update_ts > offset OR (delete_ts IS NOT NULL AND delete_ts > offset) - Includes deleted records in change detection - Fix offset updates in _get_max_update_ts_for_batch(): - Changed: MAX(update_ts) - To: MAX(GREATEST(update_ts, COALESCE(delete_ts, 0.0))) - Prevents reprocessing deleted records
- Create tests/performance/ directory for load tests separate from regular tests - Add fast_bulk_insert() helper for rapid data preparation using pandas bulk inserts - Add fast_data_loader fixture for efficient large dataset generation (50K records/batch) - Add perf_pipeline_factory fixture for creating test pipelines with configurable offset optimization - Implement 4 performance tests: * test_performance_small_dataset: 10K records baseline test * test_performance_large_dataset_with_timeout: 100K records with 60s timeout * test_performance_incremental_updates: 50K initial + 1K new records * test_performance_scalability_analysis: 100K/500K/1M with extrapolation to 10M/100M - Add timeout() context manager using signal.SIGALRM to prevent hanging on slow v1 method - Use initialize_offsets_from_transform_meta() for fast setup on large datasets (500K+) - Implement linear regression for v1 O(N) scaling and extrapolation - Modify tests/conftest.py with pytest_collection_modifyitems hook to exclude performance tests from regular test runs - Regular tests: pytest tests/ -v (excludes performance automatically) - Performance tests: pytest tests/performance/ -v -s (run separately)
c6afa33 to
f6e5cc0
Compare
Contributor
Author
|
Результаты выполнения тестов CI можно посмотреть тут: |
…ered join optimization
…uring incremental processing
[Looky-7769] fix: pandas merge performace by filttered join
Contributor
|
Переехало в #362 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
Ref PR: halconel#3
This PR implements offset-based query optimization for
BatchTransformStepto improve performance of incremental updates on large datasets. Instead of using FULL OUTER JOIN that scans all records, the new approach uses offset timestamps to filter only changed records.Implementation Phases
Phase 1: Infrastructure
TransformInputOffsetTableto track last processed timestamp per input(transform_name, input_key, offset_ts)columnsdatapipe init-offsetsfor initializationPhase 2: Offset-based Query Optimization
_build_changed_idx_sql_v2()for offset-based filteringuse_offset_optimizationparameterPhase 3: Automatic Offset Updates
Phase 4: Initialization & Delete Tracking
initialize_offsets_from_transform_meta()for fast setup from existing pipelinesPhase 5: Performance Testing
tests/performance/Performance Analysis
Complexity Comparison
V2 (with offset optimization):
Total: O(N log M + N log N) ≈ O(N log M)
V1 (without offset, FULL OUTER JOIN):
Total: O(M1 + M2 + ... + MT) - processes ALL records
Example
Performance Test Results
Run tests:
Hardware setup: Local machine with 32GB RAM and fast SSD. Production environment with PostgreSQL ~20MiB/s disk read speed will show lower absolute performance but same scaling patterns.
Scalability Analysis & Extrapolation
Extrapolation to Larger Datasets
Conclusion
💡 Performance test results also shows that the new method handles full table transformation (all records) as fast as the old method. Suggest the possibility of completely replacing the old method with the new one.
Test Coverage
Migration Guide
For New Pipelines
For Existing Pipelines
Or use CLI:
Backward Compatibility
use_offset_optimization=False)