Enhancement/11460886943/dense numeric static schema compaction#2985
Enhancement/11460886943/dense numeric static schema compaction#2985alexowens90 wants to merge 7 commits intomasterfrom
Conversation
a528945 to
fd3724d
Compare
|
ArcticDB Code Review Summary PR adds a new compact_data_experimental API. The latest commit (a65a94d) adds a first_rows_to_discard cache to complement first_rows_to_keep, eliminating redundant binary searches for discarded rows. API & Compatibility
Memory & Safety
Correctness
Performance
Code Quality
Testing
Build & Dependencies
Security
PR Title & Description
|
873e3a8 to
c99ab1a
Compare
|
|
||
| #include <memory> | ||
| #include <ranges> | ||
| #include <vector> |
There was a problem hiding this comment.
std::accumulate requires <numeric> (not transitively guaranteed across all compilers/platforms). This file only includes <memory>, <ranges>, and <vector>, which may pull it in on GCC but will likely fail on MSVC or Clang. Add the missing include before <ranges>: #include <numeric>
ArcticDB Code Review SummaryPR: Enhancement: dense numeric static schema compaction ( API & Compatibility
Memory & Safety
Correctness
Code Quality
Testing
Build & Dependencies
Security
PR Title & Description
Documentation
|
|
|
||
| @pytest.mark.parametrize("rows_per_segment", [3, 7, 10]) | ||
| def test_compact_data_idempotent(in_memory_store_factory, rows_per_segment): | ||
| def test_compact_data_idempotent(in_memory_store_factory, clear_query_stats, rows_per_segment): |
There was a problem hiding this comment.
Nit: We can probably remove this test now that we always check for idempotency on every generic_compact_data_test
There was a problem hiding this comment.
Yeh fair, will remove
| } else if (!first_rows_to_discard.contains(first_row)) { | ||
| auto begin = new_row_ranges.cbegin(); | ||
| auto end = new_row_ranges.cend(); | ||
| auto mid = begin + std::distance(begin, end) / 2; |
There was a problem hiding this comment.
I thought std::set iterators are not random access. They can't really be, they can only O(logn) which does not classify as a random access operator according to C++ spec as far as I understand.
So handrolling binary search like this is likely O(logn * logn). Can't we use lower_bound?
I'm probably misunderstanding something as I thought this would not compile.
There was a problem hiding this comment.
These are iterators over std::vector<RowRange>, which is created from the original set exactly for this reason.
To use lower bound we would need to craft a less-than operator which takes into account the slightly odd "equality" condition, which I think would be less easy to follow than the existing implementation.
| while (remaining_rows > 0) { | ||
| auto rows_to_consume = std::min(remaining_rows, slicing_info.rows_per_segment); | ||
| for (size_t idx = 0; idx < slicing_info.num_segments; ++idx) { | ||
| auto rows_to_consume = std::min(slicing_info.rows_in_slice(idx), remaining_rows); |
There was a problem hiding this comment.
Nit: Might be better to:
auto rows_to_consume = slicing_info.rows_in_slice(idx)
util::check(rows_to_consume <= remaining_rows) // With the new remainder tracking this should always be true
| row_counts = index["end_row"] - index["start_row"] | ||
| # Definitions taken from CompactDataClause constructor | ||
| min_rows_per_segment = max((2 * rows_per_segment) // 3, 1) | ||
| max_rows_per_segment = max((4 * rows_per_segment) // 3, rows_per_segment + 1) |
There was a problem hiding this comment.
So with this change we're now allowed to have segments with row count larger than the lib setting. I just want to flag that it might become a source of error if there are places in the code assuming that segment row count <= library setting row count is an invariant. I find it a bit odd. It's worth pointing out somewhere why can't we cap at row count as in the config.
There was a problem hiding this comment.
Nothing should be assuming that, as it is perfectly valid to change the lib config setting after data has been written to the library.
The idea was that this is the "optimal" slicing that the user wants. Therefore, if you have a 100k row slice, and you append a 1 row slice, then having a slice with 100,001 rows is clearly better than having a 50,000 and a 50,001 slice.
|
|
||
| post_compaction_data_keys = len(index) | ||
| new_data_keys = len(index[index["version_id"] > vit_before_compaction.version]) | ||
| expected_get_count = pre_compaction_data_keys - (post_compaction_data_keys - new_data_keys) |
There was a problem hiding this comment.
nit: what do you think about naming this compacted_data_keys
| received_df = lib.read(sym).data | ||
| assert_frame_equal(vit_before_compaction.data, received_df) | ||
| index = lib.read_index(sym) | ||
| assert len(index) == pre_compaction_data_keys |
There was a problem hiding this comment.
Can't we go one step further and assert_dataframe_equal the index before and after?
| assert row_counts.max() <= max_rows_per_segment | ||
|
|
||
| post_compaction_data_keys = len(index) | ||
| new_data_keys = len(index[index["version_id"] > vit_before_compaction.version]) |
There was a problem hiding this comment.
Now the the number of new keys is computed from the result. Meaning that it's possible to have a bug in the code producing more new keys than needed and end up with valid tests. Is it easy/possible to come up with a deterministic formula to check the expected number of new keys?
There was a problem hiding this comment.
I think to do so you would just end up re-implementing the algorithms in CompactDataClause::structure_for_processing. From a testing perspective though, we don't really care how many new data keys there are, the important invariants under test here are:
assert row_counts.min() >= min_rows_per_segment
assert row_counts.max() <= max_rows_per_segment
This is why new_data_keys is only used to assert that we didn't write any extra data keys back to storage.
| expected_col_bc = lib.read(sym, columns=["col_b", "col_c"]).data | ||
| generic_compact_data_test(lib, sym) | ||
| assert_frame_equal(expected_col_a, lib.read(sym, columns=["col_a"]).data) | ||
| assert_frame_equal(expected_col_bc, lib.read(sym, columns=["col_b", "col_c"]).data) |
There was a problem hiding this comment.
nit: can add reading only the index
There was a problem hiding this comment.
The data doesn't have an index?
cpp/arcticdb/processing/clause.cpp
Outdated
| } | ||
| // The greedy algorithm in structure_row_ranges can reslice data where all segments have an acceptable number of | ||
| // rows, which is not desirable, so short-circuit out if this is the case | ||
| if (std::all_of(row_ranges.begin(), row_ranges.end(), [this](const RowRange& row_range) { |
There was a problem hiding this comment.
nit: can use std::ranges::all_of
| } | ||
|
|
||
| std::vector<std::optional<Column>> SegmentReslicer::reslice_dense_numeric_static_schema_columns( | ||
| std::vector<std::optional<std::shared_ptr<Column>>>&& columns, const SlicingInfo& slicing_info |
There was a problem hiding this comment.
I find std::optionalstd::shared_ptr a bit redundant. Can't we achieve the same thing by making the shared ptr null?
There was a problem hiding this comment.
Yeh, I was trying to communicate intent more clearly, but I think the ugliness now outweighs the benefit.
| arg_1 = df_1 | ||
| getattr(lib, method)(arg_0, arg_1, prune_previous_version=arg) | ||
|
|
||
| assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 1 if should_be_pruned else 2 |
There was a problem hiding this comment.
This assertion will always fail when should_be_pruned=False for the compact_data_experimental case.
With the setup above (lib.write(sym, df_0) — 10 rows in a single segment) and rows_per_segment=100_000, compact_data_experimental is always a noop: min_rows_per_segment = max(2*100_000/3, 1) = 66_667, and the single 10-row segment is the only row range, so structure_for_processing erases it (it is already in processing_row_ranges and has ≤ max_rows_per_segment rows), leaving an empty ranges_and_keys. The function returns the existing v0 without writing a new version.
As a result:
should_be_pruned=True: expects 1 key → gets 1 key (passes for the wrong reason)should_be_pruned=False: expects 2 keys → gets 1 key → fails
To exercise the prune logic, the test needs data that actually requires compaction, e.g. append a second batch first so there are 2 segments whose combined size triggers a rewrite, or use a rows_per_segment that is smaller than the segment already written (≤ 6 so that max_rows_per_segment < 10).
|
TEST_PLACEHOLDER_TO_VERIFY_PATCH_WORKS |
| getattr(lib, method)(arg_0, arg_1, prune_previous_version=arg) | ||
|
|
||
| assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 1 if should_be_pruned else 2 | ||
| assert len(lt.find_keys(KeyType.TABLE_INDEX)) == (1 if should_be_pruned else 2) |
There was a problem hiding this comment.
The operator-precedence fix is correct, but the noop problem flagged in the earlier comment still applies after this delta.
State at the point of the compaction call:
lib.write(sym, df_0)— v0: 1 segment, 10 rowslib.append(sym, df_1, prune_previous_version=True)— v1: 2 segments, 20 rows total; v0 is pruned → 1 TABLE_INDEX key in store
compact_data_experimental("sym", 100_000, prune_previous_version=arg):
rows_per_segment = 100_000, somin_rows_per_segment = max(2*100_000//3, 1) = 66_667- Both segments (10 rows each) are well below the minimum, but combined they are also below the minimum → greedy algorithm emits a single range covering all 20 rows
- That single combined range exactly matches the existing set of segments (
structure_for_processingreturns an emptyranges_and_keys) → noop, no new version written
Result after the call: still 1 TABLE_INDEX key.
When should_be_pruned=False the assertion == (1 if should_be_pruned else 2) expects 2 keys but finds 1 → test fails.
To make the test exercise the prune-on-compaction path you need an input that forces an actual rewrite. The simplest fix is to use a small rows_per_segment so the 10-row segments are above max_rows_per_segment and must be split:
| assert len(lt.find_keys(KeyType.TABLE_INDEX)) == (1 if should_be_pruned else 2) | |
| # Use rows_per_segment=1 so that each 10-row segment exceeds max_rows_per_segment | |
| # and compaction must perform a real rewrite, creating a new TABLE_INDEX version. | |
| lib.append(sym, df_1, prune_previous_version=True) | |
| arg_1 = 1 # rows_per_segment |
ArcticDB Code Review SummaryPR: Enhancement/11460886943 — API & Compatibility
Note: Memory & Safety
Correctness
Code Quality
Testing
Build & Dependencies
Security
PR Title & Description
Outstanding Items Requiring Action
|
| @@ -27,7 +27,7 @@ inline std::vector<T> flatten_vectors(std::vector<std::vector<T>>&& vec_of_vecs) | |||
| std::vector<T> res; | |||
| res.reserve(res_size); | |||
| for (const auto& vec : vec_of_vecs) { | |||
There was a problem hiding this comment.
This must be a reference. Otherwise the use of move iterators below will default to copying.
| inline arcticdb::HashedValue operator()(const arcticdb::pipelines::ColRange& col_range) const noexcept { | ||
| return folly::hash::hash_combine(col_range.first, col_range.second); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Now that #include <folly/hash/Hash.h> is included we can remove these two as it handles it.
Reference Issues/PRs
11460886943
What does this implement or fix?
Adds a new
compact_data_experimentalAPI for performing compaction of data keys. After compaction, every segment will haverows_per_segment ± 33%rows, withrows_per_segmentdefaulting to the lib config setting.Currently limited to dense, numeric data with static schema. Will raise a
SchemaExceptionif any of these criteria are not met. These limitations will gradually be removed in future PRs.Full profiling and optimisation will come later, but a basic smokescreen comparing to the existing
defragment_symbol_datamethod was done to make sure there aren't any clowns in the car (all with default lib config slicing policies):defragment_symbol_datacompact_data_experimentalNote than in the last benchmark,
defragment_symbol_dataremoves the column slicing.