From 97014d0004d3578f09bdd71856dab3e6823c5f55 Mon Sep 17 00:00:00 2001 From: Phoebus Mak Date: Thu, 2 Apr 2026 13:01:31 +0100 Subject: [PATCH 1/4] Fix seg fault on folly exception --- cpp/arcticdb/arrow/test/test_arrow_write.cpp | 2 +- .../test/ingestion_stress_test.cpp | 6 +-- cpp/arcticdb/pipeline/pipeline_utils.hpp | 3 +- cpp/arcticdb/pipeline/read_frame.cpp | 22 ++++---- cpp/arcticdb/pipeline/read_frame.hpp | 4 +- cpp/arcticdb/storage/file/file_store.hpp | 2 +- .../storage/test/test_memory_storage.cpp | 2 +- .../version/local_versioned_engine.cpp | 19 +++---- .../version/local_versioned_engine.hpp | 4 +- cpp/arcticdb/version/python_bindings.cpp | 12 ++--- cpp/arcticdb/version/test/test_sparse.cpp | 26 +++++----- .../version/test/test_version_store.cpp | 22 ++++---- cpp/arcticdb/version/version_core.cpp | 24 ++++----- cpp/arcticdb/version/version_core.hpp | 6 +-- cpp/arcticdb/version/version_store_api.cpp | 6 +-- cpp/arcticdb/version/version_store_api.hpp | 6 +-- cpp/arcticdb/version/versioned_engine.hpp | 2 +- .../version_store/test_folly_exceptions.py | 51 +++++++++++++++++++ 18 files changed, 136 insertions(+), 83 deletions(-) create mode 100644 python/tests/unit/arcticdb/version_store/test_folly_exceptions.py diff --git a/cpp/arcticdb/arrow/test/test_arrow_write.cpp b/cpp/arcticdb/arrow/test/test_arrow_write.cpp index fe77c0406ec..5c003b155f4 100644 --- a/cpp/arcticdb/arrow/test/test_arrow_write.cpp +++ b/cpp/arcticdb/arrow/test/test_arrow_write.cpp @@ -370,7 +370,7 @@ TEST(ArrowWriteMemoryLifetime, InputFrameKeepsBufferAlive) { // Read back and verify auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = engine.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = read_result.root_.frame_and_descriptor_.frame_; diff --git a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp index fe3f31d2de8..47823feae38 100644 --- a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp +++ b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp @@ -130,7 +130,7 @@ TEST_F(IngestionStressStore, ScalarIntAppend) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data); } @@ -222,7 +222,7 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version_internal( symbol, VersionQuery{}, read_query, read_options, handler_data ); @@ -283,7 +283,7 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data); } diff --git a/cpp/arcticdb/pipeline/pipeline_utils.hpp b/cpp/arcticdb/pipeline/pipeline_utils.hpp index b9295660e69..fd3145be34b 100644 --- a/cpp/arcticdb/pipeline/pipeline_utils.hpp +++ b/cpp/arcticdb/pipeline/pipeline_utils.hpp @@ -135,7 +135,8 @@ inline ReadResult read_result_from_single_frame( pipeline_context->begin()->set_string_pool(frame_and_desc.frame_.string_pool_ptr()); auto descriptor = std::make_shared(frame_and_desc.frame_.descriptor()); pipeline_context->begin()->set_descriptor(std::move(descriptor)); - reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get(); + std::shared_ptr handler_data_ptr(std::shared_ptr{}, &handler_data); + reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data_ptr).get(); apply_type_handlers(frame_and_desc.frame_, handler_data, output_format); return create_python_read_result(VersionedItem{key}, output_format, std::move(frame_and_desc)); } diff --git a/cpp/arcticdb/pipeline/read_frame.cpp b/cpp/arcticdb/pipeline/read_frame.cpp index badd9bf504d..0191fe985ce 100644 --- a/cpp/arcticdb/pipeline/read_frame.cpp +++ b/cpp/arcticdb/pipeline/read_frame.cpp @@ -847,14 +847,14 @@ class NullValueReducer { size_t pos_; size_t column_block_idx_; DecodePathData shared_data_; - std::any& handler_data_; + std::shared_ptr handler_data_; const OutputFormat output_format_; std::optional default_value_; public: NullValueReducer( Column& column, std::shared_ptr& context, SegmentInMemory frame, - DecodePathData shared_data, std::any& handler_data, OutputFormat output_format, + DecodePathData shared_data, std::shared_ptr handler_data, OutputFormat output_format, std::optional default_value = {} ) : column_(column), @@ -916,7 +916,7 @@ class NullValueReducer { handler) { auto type_size = data_type_size(column_.type()); handler->default_initialize( - column_.buffer(), start_row * type_size, num_rows * type_size, shared_data_, handler_data_ + column_.buffer(), start_row * type_size, num_rows * type_size, shared_data_, *handler_data_ ); } else if (output_format_ != OutputFormat::ARROW || default_value_.has_value()) { // Arrow does not care what values are in the main buffer where the validity bitmap is zero @@ -952,12 +952,12 @@ struct ReduceColumnTask : async::BaseTask { std::shared_ptr slice_map_; std::shared_ptr context_; DecodePathData shared_data_; - std::any& handler_data_; + std::shared_ptr handler_data_; ReadOptions read_options_; ReduceColumnTask( SegmentInMemory frame, size_t c, std::shared_ptr slice_map, - std::shared_ptr& context, DecodePathData shared_data, std::any& handler_data, + std::shared_ptr& context, DecodePathData shared_data, std::shared_ptr handler_data, const ReadOptions& read_options ) : frame_(std::move(frame)), @@ -1050,7 +1050,7 @@ struct ReduceColumnTask : async::BaseTask { folly::Future reduce_and_fix_columns( std::shared_ptr& context, SegmentInMemory& frame, const ReadOptions& read_options, - std::any& handler_data + std::shared_ptr handler_data ) { ARCTICDB_SAMPLE_DEFAULT(ReduceAndFixStringCol) ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); @@ -1074,7 +1074,7 @@ folly::Future reduce_and_fix_columns( static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100); return folly::collect(folly::window( std::move(fields_to_reduce), - [context, frame, slice_map, shared_data, read_options, &handler_data](size_t field + [context, frame, slice_map, shared_data, read_options, handler_data](size_t field ) mutable { return async::submit_cpu_task(ReduceColumnTask( frame, field, slice_map, context, shared_data, handler_data, read_options @@ -1089,7 +1089,7 @@ folly::Future reduce_and_fix_columns( folly::Future fetch_data( SegmentInMemory&& frame, const std::shared_ptr& context, const std::shared_ptr& ssource, const ReadQuery& read_query, - const ReadOptions& read_options, DecodePathData shared_data, std::any& handler_data + const ReadOptions& read_options, DecodePathData shared_data, std::shared_ptr handler_data ) { ARCTICDB_SAMPLE_DEFAULT(FetchSlices) if (frame.empty()) @@ -1108,17 +1108,17 @@ folly::Future fetch_data( frame = frame, dynamic_schema = dynamic_schema, shared_data, - &handler_data, + handler_data, read_query, read_options](auto&& ks) mutable { auto key_seg = std::forward(ks); if (dynamic_schema) { decode_into_frame_dynamic( - frame, row, key_seg, shared_data, handler_data, read_query, read_options + frame, row, key_seg, shared_data, *handler_data, read_query, read_options ); } else { decode_into_frame_static( - frame, row, key_seg, shared_data, handler_data, read_query, read_options + frame, row, key_seg, shared_data, *handler_data, read_query, read_options ); } diff --git a/cpp/arcticdb/pipeline/read_frame.hpp b/cpp/arcticdb/pipeline/read_frame.hpp index de2bbd4855b..ff234b7a1a2 100644 --- a/cpp/arcticdb/pipeline/read_frame.hpp +++ b/cpp/arcticdb/pipeline/read_frame.hpp @@ -69,7 +69,7 @@ void mark_index_slices(const std::shared_ptr& context); folly::Future fetch_data( SegmentInMemory&& frame, const std::shared_ptr& context, const std::shared_ptr& ssource, const ReadQuery& read_query, - const ReadOptions& read_options, DecodePathData shared_data, std::any& handler_data + const ReadOptions& read_options, DecodePathData shared_data, std::shared_ptr handler_data ); void decode_into_frame_static( @@ -86,7 +86,7 @@ void decode_into_frame_dynamic( folly::Future reduce_and_fix_columns( std::shared_ptr& context, SegmentInMemory& frame, const ReadOptions& read_options, - std::any& handler_data + std::shared_ptr handler_data ); StreamDescriptor get_filtered_descriptor( diff --git a/cpp/arcticdb/storage/file/file_store.hpp b/cpp/arcticdb/storage/file/file_store.hpp index 2d319c7cc59..2ef5478bda2 100644 --- a/cpp/arcticdb/storage/file/file_store.hpp +++ b/cpp/arcticdb/storage/file/file_store.hpp @@ -134,7 +134,7 @@ void write_dataframe_to_file_internal( version_store::ReadVersionOutput read_dataframe_from_file_internal( const StreamId& stream_id, const std::string& path, const std::shared_ptr& read_query, const ReadOptions& read_options, const arcticdb::proto::encoding::VariantCodec& codec_opts, - std::any& handler_data + std::shared_ptr handler_data ) { auto config = storage::file::pack_config(path, codec_opts); storage::LibraryPath lib_path{std::string{"file"}, fmt::format("{}", stream_id)}; diff --git a/cpp/arcticdb/storage/test/test_memory_storage.cpp b/cpp/arcticdb/storage/test/test_memory_storage.cpp index dc292351f58..1ae2b2d53a3 100644 --- a/cpp/arcticdb/storage/test/test_memory_storage.cpp +++ b/cpp/arcticdb/storage/test/test_memory_storage.cpp @@ -30,7 +30,7 @@ TEST(InMemory, ReadTwice) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result1 = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index b47627c3aff..ca89b925c22 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -430,7 +430,7 @@ VersionIdentifier get_version_identifier( ReadVersionWithNodesOutput LocalVersionedEngine::read_dataframe_version_internal( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) { py::gil_scoped_release release_gil; const auto identifier = util::variant_match( @@ -1312,7 +1312,7 @@ VersionedItem LocalVersionedEngine::defragment_symbol_data( std::vector> LocalVersionedEngine::batch_read_internal( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options, - std::any& handler_data + std::shared_ptr handler_data ) { py::gil_scoped_release release_gil; if (stream_ids.empty()) { @@ -1336,7 +1336,7 @@ std::vector> LocalVersionedE read_query = read_queries.empty() ? std::make_shared() : read_queries[idx], read_options = batch_read_options.at(idx), - &handler_data](auto&& opt_index_key) { + handler_data](auto&& opt_index_key) { auto version_info = get_version_identifier( stream_ids[idx], version_queries[idx], @@ -1351,7 +1351,7 @@ std::vector> LocalVersionedE read_query = read_queries.empty() ? std::make_shared() : read_queries[idx], read_options = batch_read_options.at(idx), - &handler_data](ReadVersionOutput&& result) { + handler_data](ReadVersionOutput&& result) { auto& keys = result.frame_and_descriptor_.keys_; if (keys.empty()) { return folly::makeFuture(ReadVersionWithNodesOutput{std::move(result), {}}); @@ -1477,9 +1477,10 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal( clause->set_component_manager(component_manager); } auto clauses_ptr = std::make_shared>>(std::move(clauses)); + std::shared_ptr handler_data_ptr(std::shared_ptr{}, &handler_data); return folly::collect(symbol_processing_result_futs) .via(&async::io_executor()) - .thenValueInline([this, &handler_data, clauses_ptr, component_manager, read_options]( + .thenValueInline([this, &handler_data_ptr, clauses_ptr, component_manager, read_options]( std::vector&& symbol_processing_results ) mutable { auto [input_schemas, entity_ids, res_versioned_items, res_metadatas] = @@ -1493,14 +1494,14 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal( std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); return collect_segments(std::move(proc)); }) - .thenValueInline([store = store(), &handler_data, pipeline_context, read_options]( + .thenValueInline([store = store(), handler_data_ptr, pipeline_context, read_options]( std::vector&& slice_and_keys ) mutable { return prepare_output_frame( - std::move(slice_and_keys), pipeline_context, store, read_options, handler_data + std::move(slice_and_keys), pipeline_context, store, read_options, handler_data_ptr ); }) - .thenValueInline([&handler_data, + .thenValueInline([&handler_data_ptr, pipeline_context, res_versioned_items, res_metadatas, @@ -1510,7 +1511,7 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal( ReadOptions read_options_with_dynamic_schema = read_options.clone(); read_options_with_dynamic_schema.set_dynamic_schema(true); return reduce_and_fix_columns( - pipeline_context, frame, read_options_with_dynamic_schema, handler_data + pipeline_context, frame, read_options_with_dynamic_schema, handler_data_ptr ) .thenValueInline([pipeline_context, frame, diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index 8f864080784..1a6d050db15 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -157,7 +157,7 @@ class LocalVersionedEngine : public VersionedEngine { ReadVersionWithNodesOutput read_dataframe_version_internal( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) override; VersionedItem read_modify_write_internal( @@ -289,7 +289,7 @@ class LocalVersionedEngine : public VersionedEngine { std::vector> batch_read_internal( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options, - std::any& handler_data + std::shared_ptr handler_data ); MultiSymbolReadOutput batch_read_and_join_internal( diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index 55460da06a2..3982eba66ff 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -233,9 +233,9 @@ void register_bindings(py::module& version, py::exception& read_query, const ReadOptions& read_options ) { - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format()); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format())); return adapt_read_df( - read_dataframe_from_file(sid, path, read_query, read_options, handler_data), &handler_data + read_dataframe_from_file(sid, path, read_query, read_options, handler_data), handler_data.get() ); } ); @@ -793,10 +793,10 @@ void register_bindings(py::module& version, py::exception& read_query, const ReadOptions& read_options) { auto handler_data = - TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format()); + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format())); return adapt_read_df( v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data), - &handler_data + handler_data.get() ); }, py::call_guard(), @@ -939,12 +939,12 @@ void register_bindings(py::module& version, py::exception>& read_queries, const BatchReadOptions& batch_read_options) { auto handler_data = - TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format()); + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format())); return python_util::adapt_read_dfs( v.batch_read( stream_ids, version_queries, read_queries, batch_read_options, handler_data ), - &handler_data + handler_data.get() ); }, py::call_guard(), diff --git a/cpp/arcticdb/version/test/test_sparse.cpp b/cpp/arcticdb/version/test/test_sparse.cpp index c78c2151d79..4410c0dade3 100644 --- a/cpp/arcticdb/version/test/test_sparse.cpp +++ b/cpp/arcticdb/version/test/test_sparse.cpp @@ -83,7 +83,7 @@ TEST_F(SparseTestStore, SimpleRoundtrip) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -173,7 +173,7 @@ TEST_F(SparseTestStore, SimpleRoundtripBackwardsCompat) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -225,7 +225,7 @@ TEST_F(SparseTestStore, DenseToSparse) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -276,12 +276,12 @@ TEST_F(SparseTestStore, SimpleRoundtripStrings) { read_options.set_incompletes(true); auto read_query = std::make_shared(); read_query->row_filter = universal_range(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); const auto& frame = std::get(read_result.frame_data).frame(); - apply_global_refcounts(handler_data, OutputFormat::PANDAS); + apply_global_refcounts(*handler_data, OutputFormat::PANDAS); ASSERT_EQ(frame.row_count(), 2); auto val1 = frame.scalar_at(0, 1); @@ -334,7 +334,7 @@ TEST_F(SparseTestStore, Multiblock) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -391,7 +391,7 @@ TEST_F(SparseTestStore, Segment) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -459,7 +459,7 @@ TEST_F(SparseTestStore, SegmentWithExistingIndex) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -528,7 +528,7 @@ TEST_F(SparseTestStore, SegmentAndFilterColumn) { read_query->columns = {"time", "first"}; read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -592,7 +592,7 @@ TEST_F(SparseTestStore, SegmentWithRangeFilter) { auto read_query = std::make_shared(); read_query->row_filter = IndexRange(timestamp{3000}, timestamp{6999}); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -652,7 +652,7 @@ TEST_F(SparseTestStore, Compact) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -715,10 +715,10 @@ TEST_F(SparseTestStore, CompactWithStrings) { read_options.set_dynamic_schema(true); auto read_query = std::make_shared(); read_query->row_filter = universal_range(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS)); auto read_result = test_store_->read_dataframe_version(stream_id, VersionQuery{}, read_query, read_options, handler_data); - apply_global_refcounts(handler_data, OutputFormat::PANDAS); + apply_global_refcounts(*handler_data, OutputFormat::PANDAS); const auto& frame = std::get(read_result.frame_data).frame(); ASSERT_EQ(frame.row_count(), num_rows); diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index 4fb7476d244..458501eed0f 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -261,7 +261,7 @@ TEST_F(VersionStoreTest, CompactIncompleteDynamicSchema) { auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = std::get(read_result.frame_data).frame(); @@ -348,7 +348,7 @@ TEST_F(VersionStoreTest, CompactIncompleteStaticSchemaIndexed) { auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = std::get(read_result.frame_data).frame(); @@ -430,7 +430,7 @@ TEST_F(VersionStoreTest, CompactIncompleteStaticSchemaRowCountIndex) { auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = std::get(read_result.frame_data).frame(); @@ -525,7 +525,7 @@ TEST_F(VersionStoreTest, StressBatchReadUncompressed) { BatchReadOptions batch_read_options(true); batch_read_options.set_output_format(OutputFormat::NATIVE); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format()); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format())); auto latest_versions = test_store_->batch_read( symbols, std::vector(10), read_queries, batch_read_options, handler_data ); @@ -720,7 +720,7 @@ TEST(VersionStore, UpdateWithin) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -764,7 +764,7 @@ TEST(VersionStore, UpdateBefore) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -808,7 +808,7 @@ TEST(VersionStore, UpdateAfter) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -852,7 +852,7 @@ TEST(VersionStore, UpdateIntersectBefore) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -896,7 +896,7 @@ TEST(VersionStore, UpdateIntersectAfter) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -950,7 +950,7 @@ TEST(VersionStore, UpdateWithinSchemaChange) { read_options.set_dynamic_schema(true); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, read_options, handler_data ); @@ -1014,7 +1014,7 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { read_options.set_dynamic_schema(true); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, read_options, handler_data ); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 7720a7b9fd3..ded4bc7a3d6 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -633,7 +633,7 @@ VersionedItem update_impl( folly::Future read_multi_key( const std::shared_ptr& store, const ReadOptions& read_options, const SegmentInMemory& index_key_seg, - std::any& handler_data, AtomKey&& key + std::shared_ptr handler_data, AtomKey&& key ) { std::vector keys; keys.reserve(index_key_seg.row_count()); @@ -1791,13 +1791,13 @@ struct CopyToBufferTask : async::BaseTask { FrameSlice frame_slice_; uint32_t required_fields_count_; DecodePathData shared_data_; - std::any& handler_data_; + std::shared_ptr handler_data_; const ReadOptions read_options_; std::shared_ptr pipeline_context_; CopyToBufferTask( SegmentInMemory&& source_segment, SegmentInMemory target_segment, FrameSlice frame_slice, - uint32_t required_fields_count, DecodePathData shared_data, std::any& handler_data, + uint32_t required_fields_count, DecodePathData shared_data, std::shared_ptr handler_data, const ReadOptions& read_options, std::shared_ptr pipeline_context ) : source_segment_(std::move(source_segment)), @@ -1827,7 +1827,7 @@ struct CopyToBufferTask : async::BaseTask { idx, frame_slice_.row_range, shared_data_, - handler_data_, + *handler_data_, read_options_, {} ); @@ -1853,7 +1853,7 @@ struct CopyToBufferTask : async::BaseTask { idx, frame_slice_.row_range, shared_data_, - handler_data_, + *handler_data_, read_options_, default_value ); @@ -1865,7 +1865,7 @@ struct CopyToBufferTask : async::BaseTask { folly::Future copy_segments_to_frame( const std::shared_ptr& store, const std::shared_ptr& pipeline_context, - SegmentInMemory frame, std::any& handler_data, const ReadOptions& read_options + SegmentInMemory frame, std::shared_ptr handler_data, const ReadOptions& read_options ) { const auto required_fields_count = pipelines::index::required_fields_count(pipeline_context->descriptor(), *pipeline_context->norm_meta_); @@ -1890,7 +1890,7 @@ folly::Future copy_segments_to_frame( folly::Future prepare_output_frame( std::vector&& items, const std::shared_ptr& pipeline_context, - const std::shared_ptr& store, const ReadOptions& read_options, std::any& handler_data + const std::shared_ptr& store, const ReadOptions& read_options, std::shared_ptr handler_data ) { pipeline_context->clear_vectors(); pipeline_context->slice_and_keys_ = std::move(items); @@ -2062,14 +2062,14 @@ ColumnStats get_column_stats_info_impl(const std::shared_ptr& store, cons folly::Future do_direct_read_or_process( const std::shared_ptr& store, const std::shared_ptr& read_query, const ReadOptions& read_options, const std::shared_ptr& pipeline_context, - const DecodePathData& shared_data, std::any& handler_data + const DecodePathData& shared_data, std::shared_ptr handler_data ) { const bool direct_read = read_query->clauses_.empty(); if (!direct_read) { ARCTICDB_SAMPLE(RunPipelineAndOutput, 0) util::check_rte(!pipeline_context->is_pickled(), "Cannot filter pickled data"); return read_process_and_collect(store, pipeline_context, read_query, read_options) - .thenValue([store, pipeline_context, read_options, &handler_data](std::vector&& segs) { + .thenValue([store, pipeline_context, read_options, handler_data](std::vector&& segs) { return prepare_output_frame(std::move(segs), pipeline_context, store, read_options, handler_data); }); } else { @@ -2728,11 +2728,11 @@ VersionedItem generate_result_versioned_item(const VersionIdentifier& version_in folly::Future read_frame_for_version( const std::shared_ptr& store, const VersionIdentifier& version_info, - const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data + const std::shared_ptr& read_query, const ReadOptions& read_options, std::shared_ptr handler_data ) { return async::submit_io_task(SetupPipelineContextTask{store, version_info, read_query, read_options}) .via(&async::cpu_executor()) - .thenValue([store, read_query, read_options, version_info, &handler_data](auto&& pipeline_context) { + .thenValue([store, read_query, read_options, version_info, handler_data](auto&& pipeline_context) { auto res_versioned_item = generate_result_versioned_item(version_info); if (pipeline_context->multi_key_) { if (read_query) { @@ -2754,7 +2754,7 @@ folly::Future read_frame_for_version( .thenValue([res_versioned_item = std::move(res_versioned_item), pipeline_context, read_options, - &handler_data, + handler_data, read_query, shared_data](auto&& frame) mutable { ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index d259ab6564a..c771c7196b3 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -89,7 +89,7 @@ ColumnStats get_column_stats_info_impl(const std::shared_ptr& store, cons folly::Future read_multi_key( const std::shared_ptr& store, const ReadOptions& read_options, const SegmentInMemory& index_key_seg, - std::any& handler_data + std::shared_ptr handler_data ); folly::Future> schedule_remaining_iterations( @@ -143,7 +143,7 @@ void add_index_columns_to_query(const ReadQuery& read_query, const TimeseriesDes folly::Future read_frame_for_version( const std::shared_ptr& store, const VersionIdentifier& version_info, - const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data + const std::shared_ptr& read_query, const ReadOptions& read_options, std::shared_ptr handler_data ); folly::Future read_and_process( @@ -181,7 +181,7 @@ std::optional get_delete_keys_on_failure( folly::Future prepare_output_frame( std::vector&& items, const std::shared_ptr& pipeline_context, - const std::shared_ptr& store, const ReadOptions& read_options, std::any& handler_data + const std::shared_ptr& store, const ReadOptions& read_options, std::shared_ptr handler_data ); folly::Future read_modify_write_impl( diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index bebc9da98d3..11eae92c703 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -952,7 +952,7 @@ std::unordered_map PythonVersionStore::get_all_tombstoned_versi std::vector> PythonVersionStore::batch_read( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options, - std::any& handler_data + std::shared_ptr handler_data ) { auto read_versions_or_errors = @@ -1044,7 +1044,7 @@ void PythonVersionStore::delete_snapshot_sync(const SnapshotId& snap_name, const ReadResult PythonVersionStore::read_dataframe_version( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) { auto opt_version_and_frame = @@ -1460,7 +1460,7 @@ void write_dataframe_to_file( ReadResult read_dataframe_from_file( const StreamId& stream_id, const std::string& path, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) { auto release_gil = std::make_unique(); diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index 54706ef9d34..1545a8f70cd 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -121,7 +121,7 @@ class PythonVersionStore : public LocalVersionedEngine { ReadResult read_dataframe_version( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ); VersionedItem read_modify_write( @@ -231,7 +231,7 @@ class PythonVersionStore : public LocalVersionedEngine { std::vector> batch_read( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options, - std::any& handler_data + std::shared_ptr handler_data ); std::vector batch_update( @@ -308,7 +308,7 @@ void write_dataframe_to_file( ReadResult read_dataframe_from_file( const StreamId& stream_id, const std::string& path, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ); struct ManualClockVersionStore : PythonVersionStore { diff --git a/cpp/arcticdb/version/versioned_engine.hpp b/cpp/arcticdb/version/versioned_engine.hpp index 77780475b24..97ac7b6e82d 100644 --- a/cpp/arcticdb/version/versioned_engine.hpp +++ b/cpp/arcticdb/version/versioned_engine.hpp @@ -89,7 +89,7 @@ class VersionedEngine { virtual ReadVersionWithNodesOutput read_dataframe_version_internal( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) = 0; virtual VersionedItem read_modify_write_internal( diff --git a/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py b/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py new file mode 100644 index 00000000000..9cad292deba --- /dev/null +++ b/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py @@ -0,0 +1,51 @@ +import pandas as pd +import pytest + +from arcticdb.util.test import config_context_multi +from arcticdb_ext.storage import KeyType +import arcticdb_ext.cpp_async as adb_async + + +@pytest.fixture +def missing_tdata_lib(s3_store_factory, sym): + try: + with config_context_multi( + {"VersionStore.NumIOThreads": 100, "VersionStore.NumCPUThreads": 100} + ): + adb_async.reinit_task_scheduler() + assert adb_async.io_thread_count() == 100 + assert adb_async.cpu_thread_count() == 100 + + # dynamic_strings registers a PythonStringHandler, which causes handler_data + # to be accessed during segment decoding on IO threads. + lib = s3_store_factory(segment_row_size=1, dynamic_strings=True) + num_rows = 100 + + df = pd.DataFrame({"a": ["b"] * num_rows}, index=pd.date_range("2026-01-01", periods=num_rows, freq="s")) + lib.write(sym, df) + + lib_tool = lib.library_tool() + data_keys = lib_tool.find_keys_for_symbol(KeyType.TABLE_DATA, sym) + assert len(data_keys) == num_rows + + # Triggers an IO exception in a folly task to verify proper exception handling. + # folly::collect fails fast when a task raises an exception. If the caller's stack + # unwinds while other folly tasks are still running, they may access destroyed + # stack variables, causing a segfault + lib_tool.remove(data_keys[0]) + + yield lib, sym + finally: + adb_async.reinit_task_scheduler() + + +def test_read_with_missing_tdata_raises(missing_tdata_lib): + lib, sym = missing_tdata_lib + with pytest.raises(Exception): + lib.read(sym) + + +def test_batch_read_with_missing_tdata_raises(missing_tdata_lib): + lib, sym = missing_tdata_lib + with pytest.raises(Exception): + lib.batch_read([sym]) From a3a3b273d497154b9ea08b55234edfe292e54a45 Mon Sep 17 00:00:00 2001 From: Phoebus Mak <61957902+phoebusm@users.noreply.github.com> Date: Thu, 2 Apr 2026 13:38:41 +0100 Subject: [PATCH 2/4] Update cpp/arcticdb/version/local_versioned_engine.cpp Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com> --- cpp/arcticdb/version/local_versioned_engine.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index ca89b925c22..61fb150233f 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -1480,7 +1480,7 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal( std::shared_ptr handler_data_ptr(std::shared_ptr{}, &handler_data); return folly::collect(symbol_processing_result_futs) .via(&async::io_executor()) - .thenValueInline([this, &handler_data_ptr, clauses_ptr, component_manager, read_options]( + .thenValueInline([this, handler_data_ptr, clauses_ptr, component_manager, read_options]( std::vector&& symbol_processing_results ) mutable { auto [input_schemas, entity_ids, res_versioned_items, res_metadatas] = From f1988e1122dadc335c76a9ca0207a8cf01b6ae7a Mon Sep 17 00:00:00 2001 From: Phoebus Mak <61957902+phoebusm@users.noreply.github.com> Date: Thu, 2 Apr 2026 13:38:52 +0100 Subject: [PATCH 3/4] Update cpp/arcticdb/version/local_versioned_engine.cpp Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com> --- cpp/arcticdb/version/local_versioned_engine.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 61fb150233f..c35d94110c0 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -1501,7 +1501,7 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal( std::move(slice_and_keys), pipeline_context, store, read_options, handler_data_ptr ); }) - .thenValueInline([&handler_data_ptr, + .thenValueInline([handler_data_ptr, pipeline_context, res_versioned_items, res_metadatas, From 1332d96b62331b02e5d7928ef865d9f528464abc Mon Sep 17 00:00:00 2001 From: phoebusm Date: Thu, 2 Apr 2026 13:43:20 +0100 Subject: [PATCH 4/4] Fix format --- cpp/arcticdb/arrow/test/test_arrow_write.cpp | 3 +- .../test/ingestion_stress_test.cpp | 9 +++-- cpp/arcticdb/pipeline/read_frame.cpp | 4 +-- .../storage/test/test_memory_storage.cpp | 3 +- cpp/arcticdb/version/python_bindings.cpp | 14 +++++--- cpp/arcticdb/version/test/test_sparse.cpp | 33 ++++++++++++------ .../version/test/test_version_store.cpp | 34 +++++++++++++------ cpp/arcticdb/version/version_core.cpp | 3 +- cpp/arcticdb/version/version_core.hpp | 3 +- .../version_store/test_folly_exceptions.py | 4 +-- 10 files changed, 71 insertions(+), 39 deletions(-) diff --git a/cpp/arcticdb/arrow/test/test_arrow_write.cpp b/cpp/arcticdb/arrow/test/test_arrow_write.cpp index 5c003b155f4..fec09904d60 100644 --- a/cpp/arcticdb/arrow/test/test_arrow_write.cpp +++ b/cpp/arcticdb/arrow/test/test_arrow_write.cpp @@ -370,7 +370,8 @@ TEST(ArrowWriteMemoryLifetime, InputFrameKeepsBufferAlive) { // Read back and verify auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = engine.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = read_result.root_.frame_and_descriptor_.frame_; diff --git a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp index 47823feae38..4973538fc7b 100644 --- a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp +++ b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp @@ -130,7 +130,8 @@ TEST_F(IngestionStressStore, ScalarIntAppend) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data); } @@ -222,7 +223,8 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version_internal( symbol, VersionQuery{}, read_query, read_options, handler_data ); @@ -283,7 +285,8 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data); } diff --git a/cpp/arcticdb/pipeline/read_frame.cpp b/cpp/arcticdb/pipeline/read_frame.cpp index 0191fe985ce..4de2caaf3de 100644 --- a/cpp/arcticdb/pipeline/read_frame.cpp +++ b/cpp/arcticdb/pipeline/read_frame.cpp @@ -957,8 +957,8 @@ struct ReduceColumnTask : async::BaseTask { ReduceColumnTask( SegmentInMemory frame, size_t c, std::shared_ptr slice_map, - std::shared_ptr& context, DecodePathData shared_data, std::shared_ptr handler_data, - const ReadOptions& read_options + std::shared_ptr& context, DecodePathData shared_data, + std::shared_ptr handler_data, const ReadOptions& read_options ) : frame_(std::move(frame)), column_index_(c), diff --git a/cpp/arcticdb/storage/test/test_memory_storage.cpp b/cpp/arcticdb/storage/test/test_memory_storage.cpp index 1ae2b2d53a3..e4619a310ff 100644 --- a/cpp/arcticdb/storage/test/test_memory_storage.cpp +++ b/cpp/arcticdb/storage/test/test_memory_storage.cpp @@ -30,7 +30,8 @@ TEST(InMemory, ReadTwice) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result1 = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index 3982eba66ff..8658bc4a66d 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -233,7 +233,9 @@ void register_bindings(py::module& version, py::exception& read_query, const ReadOptions& read_options ) { - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format())); + auto handler_data = std::make_shared( + TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format()) + ); return adapt_read_df( read_dataframe_from_file(sid, path, read_query, read_options, handler_data), handler_data.get() ); @@ -792,8 +794,9 @@ void register_bindings(py::module& version, py::exception& read_query, const ReadOptions& read_options) { - auto handler_data = - std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format())); + auto handler_data = std::make_shared( + TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format()) + ); return adapt_read_df( v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data), handler_data.get() @@ -938,8 +941,9 @@ void register_bindings(py::module& version, py::exception& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options) { - auto handler_data = - std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format())); + auto handler_data = std::make_shared( + TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format()) + ); return python_util::adapt_read_dfs( v.batch_read( stream_ids, version_queries, read_queries, batch_read_options, handler_data diff --git a/cpp/arcticdb/version/test/test_sparse.cpp b/cpp/arcticdb/version/test/test_sparse.cpp index 4410c0dade3..22336129920 100644 --- a/cpp/arcticdb/version/test/test_sparse.cpp +++ b/cpp/arcticdb/version/test/test_sparse.cpp @@ -83,7 +83,8 @@ TEST_F(SparseTestStore, SimpleRoundtrip) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -173,7 +174,8 @@ TEST_F(SparseTestStore, SimpleRoundtripBackwardsCompat) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -225,7 +227,8 @@ TEST_F(SparseTestStore, DenseToSparse) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -276,7 +279,8 @@ TEST_F(SparseTestStore, SimpleRoundtripStrings) { read_options.set_incompletes(true); auto read_query = std::make_shared(); read_query->row_filter = universal_range(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -334,7 +338,8 @@ TEST_F(SparseTestStore, Multiblock) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -391,7 +396,8 @@ TEST_F(SparseTestStore, Segment) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -459,7 +465,8 @@ TEST_F(SparseTestStore, SegmentWithExistingIndex) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -528,7 +535,8 @@ TEST_F(SparseTestStore, SegmentAndFilterColumn) { read_query->columns = {"time", "first"}; read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -592,7 +600,8 @@ TEST_F(SparseTestStore, SegmentWithRangeFilter) { auto read_query = std::make_shared(); read_query->row_filter = IndexRange(timestamp{3000}, timestamp{6999}); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -652,7 +661,8 @@ TEST_F(SparseTestStore, Compact) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -715,7 +725,8 @@ TEST_F(SparseTestStore, CompactWithStrings) { read_options.set_dynamic_schema(true); auto read_query = std::make_shared(); read_query->row_filter = universal_range(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS)); auto read_result = test_store_->read_dataframe_version(stream_id, VersionQuery{}, read_query, read_options, handler_data); apply_global_refcounts(*handler_data, OutputFormat::PANDAS); diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index 458501eed0f..629d23b9c4b 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -261,7 +261,8 @@ TEST_F(VersionStoreTest, CompactIncompleteDynamicSchema) { auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = std::get(read_result.frame_data).frame(); @@ -348,7 +349,8 @@ TEST_F(VersionStoreTest, CompactIncompleteStaticSchemaIndexed) { auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = std::get(read_result.frame_data).frame(); @@ -430,7 +432,8 @@ TEST_F(VersionStoreTest, CompactIncompleteStaticSchemaRowCountIndex) { auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = std::get(read_result.frame_data).frame(); @@ -525,7 +528,9 @@ TEST_F(VersionStoreTest, StressBatchReadUncompressed) { BatchReadOptions batch_read_options(true); batch_read_options.set_output_format(OutputFormat::NATIVE); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format())); + auto handler_data = std::make_shared( + TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format()) + ); auto latest_versions = test_store_->batch_read( symbols, std::vector(10), read_queries, batch_read_options, handler_data ); @@ -720,7 +725,8 @@ TEST(VersionStore, UpdateWithin) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -764,7 +770,8 @@ TEST(VersionStore, UpdateBefore) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -808,7 +815,8 @@ TEST(VersionStore, UpdateAfter) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -852,7 +860,8 @@ TEST(VersionStore, UpdateIntersectBefore) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -896,7 +905,8 @@ TEST(VersionStore, UpdateIntersectAfter) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -950,7 +960,8 @@ TEST(VersionStore, UpdateWithinSchemaChange) { read_options.set_dynamic_schema(true); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, read_options, handler_data ); @@ -1014,7 +1025,8 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { read_options.set_dynamic_schema(true); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, read_options, handler_data ); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index ded4bc7a3d6..dcaf78618de 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -2728,7 +2728,8 @@ VersionedItem generate_result_versioned_item(const VersionIdentifier& version_in folly::Future read_frame_for_version( const std::shared_ptr& store, const VersionIdentifier& version_info, - const std::shared_ptr& read_query, const ReadOptions& read_options, std::shared_ptr handler_data + const std::shared_ptr& read_query, const ReadOptions& read_options, + std::shared_ptr handler_data ) { return async::submit_io_task(SetupPipelineContextTask{store, version_info, read_query, read_options}) .via(&async::cpu_executor()) diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index c771c7196b3..27789f8ed38 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -143,7 +143,8 @@ void add_index_columns_to_query(const ReadQuery& read_query, const TimeseriesDes folly::Future read_frame_for_version( const std::shared_ptr& store, const VersionIdentifier& version_info, - const std::shared_ptr& read_query, const ReadOptions& read_options, std::shared_ptr handler_data + const std::shared_ptr& read_query, const ReadOptions& read_options, + std::shared_ptr handler_data ); folly::Future read_and_process( diff --git a/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py b/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py index 9cad292deba..3b83ac59f54 100644 --- a/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py +++ b/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py @@ -9,9 +9,7 @@ @pytest.fixture def missing_tdata_lib(s3_store_factory, sym): try: - with config_context_multi( - {"VersionStore.NumIOThreads": 100, "VersionStore.NumCPUThreads": 100} - ): + with config_context_multi({"VersionStore.NumIOThreads": 100, "VersionStore.NumCPUThreads": 100}): adb_async.reinit_task_scheduler() assert adb_async.io_thread_count() == 100 assert adb_async.cpu_thread_count() == 100