Conversation
cpp/arcticdb/util/key_utils.hpp
Outdated
| } | ||
| } | ||
|
|
||
| auto read_results = folly::collectAll(read_futures).get(); |
There was a problem hiding this comment.
The parallel reads in recurse_index_keys only parallelize the top-level index reads. When an input key is a MULTI_KEY, processing its segment at line 225 calls recurse_segment, which in turn calls recurse_index_key synchronously (via store->read_sync) for each nested TABLE_INDEX or MULTI_KEY found inside the multi-key segment. This means nested index reads remain sequential and block the calling thread (which may be the thread waiting on folly::collectAll(...).get()).
For typical delete_batch workloads with only TABLE_INDEX keys this is not a problem, but for snapshots or multi-key chains the speedup is incomplete, and deep nesting could block the thread pool for an extended period. This is worth a comment to set expectations, and potentially a future-work note.
| log::version().debug("Data keys deleted."); | ||
| return folly::Unit(); | ||
| }); | ||
| remove_keys_fut = folly::collectAll( |
There was a problem hiding this comment.
Deleting column-stats keys, index keys, and data keys in parallel removes the previous ordering guarantee. The original sequential chain (column_stats → index → data) was presumably intentional: if a crash or partial failure occurs mid-delete, a reader that finds an index key can still reconstruct the data. With parallel deletion, it is possible for data keys to be removed while the index key still exists, leaving a dangling reference.
ArcticDB's ignores_missing_key_ flag on reads (read_opts) provides some crash-tolerance, but the new parallel ordering is a subtle correctness trade-off worth documenting. At minimum, add a comment here explaining why parallel deletion is safe (or acceptable) in this context.
Also note that folly::collectAll returns all results regardless of individual failures. If one of the three remove_keys calls fails, its error is captured in the corresponding Try<>, but the .thenValue lambda discards them all with auto&&. Errors from individual removal groups will be silently swallowed — the caller will see a successful folly::Unit. The previous sequential chain at least propagated the first failure. Consider checking each result in the .thenValue lambda:
| remove_keys_fut = folly::collectAll( | |
| remove_keys_fut = folly::collectAll( | |
| store->remove_keys(std::move(vks_column_stats), remove_opts), | |
| store->remove_keys(std::move(vks_to_delete), remove_opts), | |
| store->remove_keys(std::move(vks_data_to_delete), remove_opts) | |
| ) | |
| .via(&async::io_executor()) | |
| .thenValue([](auto&& results) { | |
| // Re-throw first error, if any | |
| std::get<0>(results).throwUnlessValue(); | |
| std::get<1>(results).throwUnlessValue(); | |
| std::get<2>(results).throwUnlessValue(); | |
| return folly::Unit(); | |
| }); |
| async::submit_io_task(CheckReloadTask{store, version_map, min.first, load_strategy}) | ||
| ); | ||
| } | ||
| auto results = folly::collectAll(reload_futures).get(); |
There was a problem hiding this comment.
folly::collectAll(...).get() is called here on the calling thread. If this code is already executing inside a Folly thread pool callback (e.g., an IO or CPU task), calling .get() blocks that thread until all CheckReloadTask futures complete. This is the classic Folly deadlock pattern when the thread pool is saturated: all threads are blocked waiting for tasks that are queued but can never start.
The existing pattern elsewhere in version_map_batch_methods.hpp avoids .get() inside task chains. Please verify this call site is always reached from a non-pool thread (e.g., Python-side call path), or restructure to avoid the blocking .get() if it can be reached from within a pool thread.
cpp/arcticdb/util/key_utils.hpp
Outdated
| } | ||
| } | ||
|
|
||
| auto read_results = folly::collectAll(read_futures).get(); |
There was a problem hiding this comment.
Same deadlock risk as the check_reload change: folly::collectAll(read_futures).get() blocks the calling thread. recurse_index_keys is called from delete_trees_responsibly, which in turn is called in some paths from within future chains (e.g., delete_unreferenced_pruned_indexes at line 158 of local_versioned_engine.cpp is itself a .thenValue callback). If recurse_index_keys is reached from inside a thread pool thread, this .get() can deadlock when the pool is saturated.
Verify the full call graph to confirm this .get() is never reached from within a pool thread, or add a comment explaining why it is safe.
python/tests/conftest.py
Outdated
| @pytest.fixture( | ||
| params=[True, False], | ||
| ) | ||
| def check_single_threaded(request): |
There was a problem hiding this comment.
The fixture name check_single_threaded reads as though it checks (asserts) something about single-threadedness, but it actually configures the thread count. A name like single_threaded_config or maybe_single_threaded would better express its intent. The yield request.param also exposes the boolean param to tests that don't use it; tests that only need the configuration side-effect and don't use the yielded value may find this confusing.
python/tests/conftest.py
Outdated
|
|
||
| yield request.param | ||
|
|
||
| if request.param: |
There was a problem hiding this comment.
If the test body raises an exception, the teardown block (if request.param:) still runs. However, if reinit_task_scheduler() in the setup (line 144) itself throws, the yield is never reached, meaning teardown is also skipped — the config remains set at 1 thread for the rest of the test session. Consider using a try/finally pattern or a separate autouse fixture to ensure cleanup always happens.
| if os.path.exists(self.ARCTIC_DIR): | ||
| rmtree(self.ARCTIC_DIR, ignore_errors=True) | ||
|
|
||
| def time_delete_batch_symbols(self, *args): |
There was a problem hiding this comment.
DeleteBatchVersions.setup prepares delete_requests to delete versions 0 through versions_per_symbol - 3 (i.e., 8 versions for versions_per_symbol=10), leaving version versions_per_symbol - 2 (version 8) and the latest (version 9). But the comment says "all but the latest version" which is misleading — it leaves two versions. This is minor but could confuse future benchmark readers. Also, self.delete_requests deletes version 0 through 7; version 8 (second-to-last) is never deleted, so the benchmark doesn't exercise the "delete all but latest" scenario it claims to test.
| store(), version_map, index_key, std::move(stream_update_info.previous_index_key_) | ||
| } | ||
| ) | ||
| .via(&async::cpu_executor()) |
There was a problem hiding this comment.
Adding .via(&async::cpu_executor()) here routes the thenValue callback (which calls delete_unreferenced_pruned_indexes) onto the CPU thread pool. delete_unreferenced_pruned_indexes itself calls delete_trees_responsibly, which now contains folly::collectAll(...).get() calls (both for check_reload and for recurse_index_keys). Blocking inside a CPU pool callback is exactly the deadlock scenario described in guideline §10. This makes the deadlock risk from the two .get() calls more acute — please verify under load that the CPU pool cannot saturate.
ArcticDB Code Review SummaryPR: #2999 — Improve performance of delete_batch API & Compatibility
Memory & Safety
Correctness
Async & Threading
Testing
Benchmarks
Code Quality
PR Title & Description
SummaryThis commit is a one-line housekeeping fix: All previously flagged issues remain resolved. Remaining open items are minor:
|
cpp/arcticdb/util/key_utils.hpp
Outdated
| for (const auto& index_key : keys) { | ||
| same_stream_id = first_stream_id == index_key.id(); | ||
| if (first_stream_id != index_key.id()) { | ||
| same_stream_id = false; | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
nit: can use
std::ranges::adjacent_find(index_keys, [&](const AtomKey& k){ return index_key.id() != first_stream_id;});
cpp/arcticdb/util/key_utils.hpp
Outdated
| // Process results sequentially | ||
| ankerl::unordered_dense::set<AtomKey> res; | ||
| ankerl::unordered_dense::set<AtomKeyPacked> res_packed; | ||
| for (size_t i = 0; i < read_results.size(); ++i) { |
There was a problem hiding this comment.
Looks like the body of the loop can be placed in a thenValue of the read future. Each future would have to use its own set and then all sets must be merged, but it seems possible. Do you mind trying?
Reference Issues/PRs
Monday ticket ref: 11637143232
What does this implement or fix?
ArcticDB
delete_batchPerformance OptimizationProblem
delete_batchwas up to 2x slower than a naive parallel multiprocessing approach (one process per symbol deletion).Root Causes
Three sequential bottlenecks in the
delete_trees_responsiblycode path:check_reload-- version map reload for each symbol was done in a loop, one at a timerecurse_index_keys-- index segments were read one at a time to discover data keysremove_keys-- S3DeleteObjectscalls were batched by key type (3 groups), chained sequentially via.thenValueChanges
1. Parallel version map reload (
local_versioned_engine.cpp)Replaced sequential
version_map->check_reload()loop with parallel IO tasks by submittingCheckReloadTaskper unique symbol to the IO thread pool2. Parallel index key recursion (
key_utils.hpp) - Main change resulting in ~2x improvementChanged
recurse_index_keysto read all index segments concurrently and then process segments sequentially after all reads complete3. Parallel key deletion (
local_versioned_engine.cpp)Replaced sequential
.thenValuechain of 3remove_keyscalls (column_stats, index, data) to run all 3 concurrently:remove_keyssubmits aRemoveBatchTaskto a separate IO threaddo_remove_impl'sgroupByhas a single iterationBenchmark Results
delete_batch vs Parallel Multiprocessing
delete_batchdelete_batch1.14x fasterdelete_batch1.99x fasterdelete_batchwinsAll of the results are from testing on an EC2 machine against an S3 bucket in the same region.
And the number of CPU threads was set to (Number of Symbols * Number of CPU Cores).
Any other comments?
After the change,
delete_batchhas about the same performance aswrite_batch.Checklist
Checklist for code changes...