diff --git a/cpp/arcticdb/arrow/test/test_arrow_write.cpp b/cpp/arcticdb/arrow/test/test_arrow_write.cpp index fe77c0406ec..fec09904d60 100644 --- a/cpp/arcticdb/arrow/test/test_arrow_write.cpp +++ b/cpp/arcticdb/arrow/test/test_arrow_write.cpp @@ -370,7 +370,8 @@ TEST(ArrowWriteMemoryLifetime, InputFrameKeepsBufferAlive) { // Read back and verify auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = engine.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = read_result.root_.frame_and_descriptor_.frame_; diff --git a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp index fe3f31d2de8..4973538fc7b 100644 --- a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp +++ b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp @@ -130,7 +130,8 @@ TEST_F(IngestionStressStore, ScalarIntAppend) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data); } @@ -222,7 +223,8 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version_internal( symbol, VersionQuery{}, read_query, read_options, handler_data ); @@ -283,7 +285,8 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data); } diff --git a/cpp/arcticdb/pipeline/pipeline_utils.hpp b/cpp/arcticdb/pipeline/pipeline_utils.hpp index b9295660e69..fd3145be34b 100644 --- a/cpp/arcticdb/pipeline/pipeline_utils.hpp +++ b/cpp/arcticdb/pipeline/pipeline_utils.hpp @@ -135,7 +135,8 @@ inline ReadResult read_result_from_single_frame( pipeline_context->begin()->set_string_pool(frame_and_desc.frame_.string_pool_ptr()); auto descriptor = std::make_shared(frame_and_desc.frame_.descriptor()); pipeline_context->begin()->set_descriptor(std::move(descriptor)); - reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get(); + std::shared_ptr handler_data_ptr(std::shared_ptr{}, &handler_data); + reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data_ptr).get(); apply_type_handlers(frame_and_desc.frame_, handler_data, output_format); return create_python_read_result(VersionedItem{key}, output_format, std::move(frame_and_desc)); } diff --git a/cpp/arcticdb/pipeline/read_frame.cpp b/cpp/arcticdb/pipeline/read_frame.cpp index badd9bf504d..4de2caaf3de 100644 --- a/cpp/arcticdb/pipeline/read_frame.cpp +++ b/cpp/arcticdb/pipeline/read_frame.cpp @@ -847,14 +847,14 @@ class NullValueReducer { size_t pos_; size_t column_block_idx_; DecodePathData shared_data_; - std::any& handler_data_; + std::shared_ptr handler_data_; const OutputFormat output_format_; std::optional default_value_; public: NullValueReducer( Column& column, std::shared_ptr& context, SegmentInMemory frame, - DecodePathData shared_data, std::any& handler_data, OutputFormat output_format, + DecodePathData shared_data, std::shared_ptr handler_data, OutputFormat output_format, std::optional default_value = {} ) : column_(column), @@ -916,7 +916,7 @@ class NullValueReducer { handler) { auto type_size = data_type_size(column_.type()); handler->default_initialize( - column_.buffer(), start_row * type_size, num_rows * type_size, shared_data_, handler_data_ + column_.buffer(), start_row * type_size, num_rows * type_size, shared_data_, *handler_data_ ); } else if (output_format_ != OutputFormat::ARROW || default_value_.has_value()) { // Arrow does not care what values are in the main buffer where the validity bitmap is zero @@ -952,13 +952,13 @@ struct ReduceColumnTask : async::BaseTask { std::shared_ptr slice_map_; std::shared_ptr context_; DecodePathData shared_data_; - std::any& handler_data_; + std::shared_ptr handler_data_; ReadOptions read_options_; ReduceColumnTask( SegmentInMemory frame, size_t c, std::shared_ptr slice_map, - std::shared_ptr& context, DecodePathData shared_data, std::any& handler_data, - const ReadOptions& read_options + std::shared_ptr& context, DecodePathData shared_data, + std::shared_ptr handler_data, const ReadOptions& read_options ) : frame_(std::move(frame)), column_index_(c), @@ -1050,7 +1050,7 @@ struct ReduceColumnTask : async::BaseTask { folly::Future reduce_and_fix_columns( std::shared_ptr& context, SegmentInMemory& frame, const ReadOptions& read_options, - std::any& handler_data + std::shared_ptr handler_data ) { ARCTICDB_SAMPLE_DEFAULT(ReduceAndFixStringCol) ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); @@ -1074,7 +1074,7 @@ folly::Future reduce_and_fix_columns( static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100); return folly::collect(folly::window( std::move(fields_to_reduce), - [context, frame, slice_map, shared_data, read_options, &handler_data](size_t field + [context, frame, slice_map, shared_data, read_options, handler_data](size_t field ) mutable { return async::submit_cpu_task(ReduceColumnTask( frame, field, slice_map, context, shared_data, handler_data, read_options @@ -1089,7 +1089,7 @@ folly::Future reduce_and_fix_columns( folly::Future fetch_data( SegmentInMemory&& frame, const std::shared_ptr& context, const std::shared_ptr& ssource, const ReadQuery& read_query, - const ReadOptions& read_options, DecodePathData shared_data, std::any& handler_data + const ReadOptions& read_options, DecodePathData shared_data, std::shared_ptr handler_data ) { ARCTICDB_SAMPLE_DEFAULT(FetchSlices) if (frame.empty()) @@ -1108,17 +1108,17 @@ folly::Future fetch_data( frame = frame, dynamic_schema = dynamic_schema, shared_data, - &handler_data, + handler_data, read_query, read_options](auto&& ks) mutable { auto key_seg = std::forward(ks); if (dynamic_schema) { decode_into_frame_dynamic( - frame, row, key_seg, shared_data, handler_data, read_query, read_options + frame, row, key_seg, shared_data, *handler_data, read_query, read_options ); } else { decode_into_frame_static( - frame, row, key_seg, shared_data, handler_data, read_query, read_options + frame, row, key_seg, shared_data, *handler_data, read_query, read_options ); } diff --git a/cpp/arcticdb/pipeline/read_frame.hpp b/cpp/arcticdb/pipeline/read_frame.hpp index de2bbd4855b..ff234b7a1a2 100644 --- a/cpp/arcticdb/pipeline/read_frame.hpp +++ b/cpp/arcticdb/pipeline/read_frame.hpp @@ -69,7 +69,7 @@ void mark_index_slices(const std::shared_ptr& context); folly::Future fetch_data( SegmentInMemory&& frame, const std::shared_ptr& context, const std::shared_ptr& ssource, const ReadQuery& read_query, - const ReadOptions& read_options, DecodePathData shared_data, std::any& handler_data + const ReadOptions& read_options, DecodePathData shared_data, std::shared_ptr handler_data ); void decode_into_frame_static( @@ -86,7 +86,7 @@ void decode_into_frame_dynamic( folly::Future reduce_and_fix_columns( std::shared_ptr& context, SegmentInMemory& frame, const ReadOptions& read_options, - std::any& handler_data + std::shared_ptr handler_data ); StreamDescriptor get_filtered_descriptor( diff --git a/cpp/arcticdb/storage/file/file_store.hpp b/cpp/arcticdb/storage/file/file_store.hpp index 2d319c7cc59..2ef5478bda2 100644 --- a/cpp/arcticdb/storage/file/file_store.hpp +++ b/cpp/arcticdb/storage/file/file_store.hpp @@ -134,7 +134,7 @@ void write_dataframe_to_file_internal( version_store::ReadVersionOutput read_dataframe_from_file_internal( const StreamId& stream_id, const std::string& path, const std::shared_ptr& read_query, const ReadOptions& read_options, const arcticdb::proto::encoding::VariantCodec& codec_opts, - std::any& handler_data + std::shared_ptr handler_data ) { auto config = storage::file::pack_config(path, codec_opts); storage::LibraryPath lib_path{std::string{"file"}, fmt::format("{}", stream_id)}; diff --git a/cpp/arcticdb/storage/test/test_memory_storage.cpp b/cpp/arcticdb/storage/test/test_memory_storage.cpp index dc292351f58..e4619a310ff 100644 --- a/cpp/arcticdb/storage/test/test_memory_storage.cpp +++ b/cpp/arcticdb/storage/test/test_memory_storage.cpp @@ -30,7 +30,8 @@ TEST(InMemory, ReadTwice) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result1 = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index b47627c3aff..c35d94110c0 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -430,7 +430,7 @@ VersionIdentifier get_version_identifier( ReadVersionWithNodesOutput LocalVersionedEngine::read_dataframe_version_internal( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) { py::gil_scoped_release release_gil; const auto identifier = util::variant_match( @@ -1312,7 +1312,7 @@ VersionedItem LocalVersionedEngine::defragment_symbol_data( std::vector> LocalVersionedEngine::batch_read_internal( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options, - std::any& handler_data + std::shared_ptr handler_data ) { py::gil_scoped_release release_gil; if (stream_ids.empty()) { @@ -1336,7 +1336,7 @@ std::vector> LocalVersionedE read_query = read_queries.empty() ? std::make_shared() : read_queries[idx], read_options = batch_read_options.at(idx), - &handler_data](auto&& opt_index_key) { + handler_data](auto&& opt_index_key) { auto version_info = get_version_identifier( stream_ids[idx], version_queries[idx], @@ -1351,7 +1351,7 @@ std::vector> LocalVersionedE read_query = read_queries.empty() ? std::make_shared() : read_queries[idx], read_options = batch_read_options.at(idx), - &handler_data](ReadVersionOutput&& result) { + handler_data](ReadVersionOutput&& result) { auto& keys = result.frame_and_descriptor_.keys_; if (keys.empty()) { return folly::makeFuture(ReadVersionWithNodesOutput{std::move(result), {}}); @@ -1477,9 +1477,10 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal( clause->set_component_manager(component_manager); } auto clauses_ptr = std::make_shared>>(std::move(clauses)); + std::shared_ptr handler_data_ptr(std::shared_ptr{}, &handler_data); return folly::collect(symbol_processing_result_futs) .via(&async::io_executor()) - .thenValueInline([this, &handler_data, clauses_ptr, component_manager, read_options]( + .thenValueInline([this, handler_data_ptr, clauses_ptr, component_manager, read_options]( std::vector&& symbol_processing_results ) mutable { auto [input_schemas, entity_ids, res_versioned_items, res_metadatas] = @@ -1493,14 +1494,14 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal( std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); return collect_segments(std::move(proc)); }) - .thenValueInline([store = store(), &handler_data, pipeline_context, read_options]( + .thenValueInline([store = store(), handler_data_ptr, pipeline_context, read_options]( std::vector&& slice_and_keys ) mutable { return prepare_output_frame( - std::move(slice_and_keys), pipeline_context, store, read_options, handler_data + std::move(slice_and_keys), pipeline_context, store, read_options, handler_data_ptr ); }) - .thenValueInline([&handler_data, + .thenValueInline([handler_data_ptr, pipeline_context, res_versioned_items, res_metadatas, @@ -1510,7 +1511,7 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal( ReadOptions read_options_with_dynamic_schema = read_options.clone(); read_options_with_dynamic_schema.set_dynamic_schema(true); return reduce_and_fix_columns( - pipeline_context, frame, read_options_with_dynamic_schema, handler_data + pipeline_context, frame, read_options_with_dynamic_schema, handler_data_ptr ) .thenValueInline([pipeline_context, frame, diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index 8f864080784..1a6d050db15 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -157,7 +157,7 @@ class LocalVersionedEngine : public VersionedEngine { ReadVersionWithNodesOutput read_dataframe_version_internal( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) override; VersionedItem read_modify_write_internal( @@ -289,7 +289,7 @@ class LocalVersionedEngine : public VersionedEngine { std::vector> batch_read_internal( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options, - std::any& handler_data + std::shared_ptr handler_data ); MultiSymbolReadOutput batch_read_and_join_internal( diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index 55460da06a2..8658bc4a66d 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -233,9 +233,11 @@ void register_bindings(py::module& version, py::exception& read_query, const ReadOptions& read_options ) { - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format()); + auto handler_data = std::make_shared( + TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format()) + ); return adapt_read_df( - read_dataframe_from_file(sid, path, read_query, read_options, handler_data), &handler_data + read_dataframe_from_file(sid, path, read_query, read_options, handler_data), handler_data.get() ); } ); @@ -792,11 +794,12 @@ void register_bindings(py::module& version, py::exception& read_query, const ReadOptions& read_options) { - auto handler_data = - TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format()); + auto handler_data = std::make_shared( + TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format()) + ); return adapt_read_df( v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data), - &handler_data + handler_data.get() ); }, py::call_guard(), @@ -938,13 +941,14 @@ void register_bindings(py::module& version, py::exception& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options) { - auto handler_data = - TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format()); + auto handler_data = std::make_shared( + TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format()) + ); return python_util::adapt_read_dfs( v.batch_read( stream_ids, version_queries, read_queries, batch_read_options, handler_data ), - &handler_data + handler_data.get() ); }, py::call_guard(), diff --git a/cpp/arcticdb/version/test/test_sparse.cpp b/cpp/arcticdb/version/test/test_sparse.cpp index c78c2151d79..22336129920 100644 --- a/cpp/arcticdb/version/test/test_sparse.cpp +++ b/cpp/arcticdb/version/test/test_sparse.cpp @@ -83,7 +83,8 @@ TEST_F(SparseTestStore, SimpleRoundtrip) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -173,7 +174,8 @@ TEST_F(SparseTestStore, SimpleRoundtripBackwardsCompat) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -225,7 +227,8 @@ TEST_F(SparseTestStore, DenseToSparse) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -276,12 +279,13 @@ TEST_F(SparseTestStore, SimpleRoundtripStrings) { read_options.set_incompletes(true); auto read_query = std::make_shared(); read_query->row_filter = universal_range(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); const auto& frame = std::get(read_result.frame_data).frame(); - apply_global_refcounts(handler_data, OutputFormat::PANDAS); + apply_global_refcounts(*handler_data, OutputFormat::PANDAS); ASSERT_EQ(frame.row_count(), 2); auto val1 = frame.scalar_at(0, 1); @@ -334,7 +338,8 @@ TEST_F(SparseTestStore, Multiblock) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -391,7 +396,8 @@ TEST_F(SparseTestStore, Segment) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -459,7 +465,8 @@ TEST_F(SparseTestStore, SegmentWithExistingIndex) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -528,7 +535,8 @@ TEST_F(SparseTestStore, SegmentAndFilterColumn) { read_query->columns = {"time", "first"}; read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -592,7 +600,8 @@ TEST_F(SparseTestStore, SegmentWithRangeFilter) { auto read_query = std::make_shared(); read_query->row_filter = IndexRange(timestamp{3000}, timestamp{6999}); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -652,7 +661,8 @@ TEST_F(SparseTestStore, Compact) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version( stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data ); @@ -715,10 +725,11 @@ TEST_F(SparseTestStore, CompactWithStrings) { read_options.set_dynamic_schema(true); auto read_query = std::make_shared(); read_query->row_filter = universal_range(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS)); auto read_result = test_store_->read_dataframe_version(stream_id, VersionQuery{}, read_query, read_options, handler_data); - apply_global_refcounts(handler_data, OutputFormat::PANDAS); + apply_global_refcounts(*handler_data, OutputFormat::PANDAS); const auto& frame = std::get(read_result.frame_data).frame(); ASSERT_EQ(frame.row_count(), num_rows); diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index 4fb7476d244..629d23b9c4b 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -261,7 +261,8 @@ TEST_F(VersionStoreTest, CompactIncompleteDynamicSchema) { auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = std::get(read_result.frame_data).frame(); @@ -348,7 +349,8 @@ TEST_F(VersionStoreTest, CompactIncompleteStaticSchemaIndexed) { auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = std::get(read_result.frame_data).frame(); @@ -430,7 +432,8 @@ TEST_F(VersionStoreTest, CompactIncompleteStaticSchemaRowCountIndex) { auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); const auto& seg = std::get(read_result.frame_data).frame(); @@ -525,7 +528,9 @@ TEST_F(VersionStoreTest, StressBatchReadUncompressed) { BatchReadOptions batch_read_options(true); batch_read_options.set_output_format(OutputFormat::NATIVE); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format()); + auto handler_data = std::make_shared( + TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format()) + ); auto latest_versions = test_store_->batch_read( symbols, std::vector(10), read_queries, batch_read_options, handler_data ); @@ -720,7 +725,8 @@ TEST(VersionStore, UpdateWithin) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -764,7 +770,8 @@ TEST(VersionStore, UpdateBefore) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -808,7 +815,8 @@ TEST(VersionStore, UpdateAfter) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -852,7 +860,8 @@ TEST(VersionStore, UpdateIntersectBefore) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -896,7 +905,8 @@ TEST(VersionStore, UpdateIntersectAfter) { auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data ); @@ -950,7 +960,8 @@ TEST(VersionStore, UpdateWithinSchemaChange) { read_options.set_dynamic_schema(true); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, read_options, handler_data ); @@ -1014,7 +1025,8 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { read_options.set_dynamic_schema(true); auto read_query = std::make_shared(); register_native_handler_data_factory(); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE); + auto handler_data = + std::make_shared(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE)); auto read_result = version_store.read_dataframe_version_internal( symbol, VersionQuery{}, read_query, read_options, handler_data ); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 7720a7b9fd3..dcaf78618de 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -633,7 +633,7 @@ VersionedItem update_impl( folly::Future read_multi_key( const std::shared_ptr& store, const ReadOptions& read_options, const SegmentInMemory& index_key_seg, - std::any& handler_data, AtomKey&& key + std::shared_ptr handler_data, AtomKey&& key ) { std::vector keys; keys.reserve(index_key_seg.row_count()); @@ -1791,13 +1791,13 @@ struct CopyToBufferTask : async::BaseTask { FrameSlice frame_slice_; uint32_t required_fields_count_; DecodePathData shared_data_; - std::any& handler_data_; + std::shared_ptr handler_data_; const ReadOptions read_options_; std::shared_ptr pipeline_context_; CopyToBufferTask( SegmentInMemory&& source_segment, SegmentInMemory target_segment, FrameSlice frame_slice, - uint32_t required_fields_count, DecodePathData shared_data, std::any& handler_data, + uint32_t required_fields_count, DecodePathData shared_data, std::shared_ptr handler_data, const ReadOptions& read_options, std::shared_ptr pipeline_context ) : source_segment_(std::move(source_segment)), @@ -1827,7 +1827,7 @@ struct CopyToBufferTask : async::BaseTask { idx, frame_slice_.row_range, shared_data_, - handler_data_, + *handler_data_, read_options_, {} ); @@ -1853,7 +1853,7 @@ struct CopyToBufferTask : async::BaseTask { idx, frame_slice_.row_range, shared_data_, - handler_data_, + *handler_data_, read_options_, default_value ); @@ -1865,7 +1865,7 @@ struct CopyToBufferTask : async::BaseTask { folly::Future copy_segments_to_frame( const std::shared_ptr& store, const std::shared_ptr& pipeline_context, - SegmentInMemory frame, std::any& handler_data, const ReadOptions& read_options + SegmentInMemory frame, std::shared_ptr handler_data, const ReadOptions& read_options ) { const auto required_fields_count = pipelines::index::required_fields_count(pipeline_context->descriptor(), *pipeline_context->norm_meta_); @@ -1890,7 +1890,7 @@ folly::Future copy_segments_to_frame( folly::Future prepare_output_frame( std::vector&& items, const std::shared_ptr& pipeline_context, - const std::shared_ptr& store, const ReadOptions& read_options, std::any& handler_data + const std::shared_ptr& store, const ReadOptions& read_options, std::shared_ptr handler_data ) { pipeline_context->clear_vectors(); pipeline_context->slice_and_keys_ = std::move(items); @@ -2062,14 +2062,14 @@ ColumnStats get_column_stats_info_impl(const std::shared_ptr& store, cons folly::Future do_direct_read_or_process( const std::shared_ptr& store, const std::shared_ptr& read_query, const ReadOptions& read_options, const std::shared_ptr& pipeline_context, - const DecodePathData& shared_data, std::any& handler_data + const DecodePathData& shared_data, std::shared_ptr handler_data ) { const bool direct_read = read_query->clauses_.empty(); if (!direct_read) { ARCTICDB_SAMPLE(RunPipelineAndOutput, 0) util::check_rte(!pipeline_context->is_pickled(), "Cannot filter pickled data"); return read_process_and_collect(store, pipeline_context, read_query, read_options) - .thenValue([store, pipeline_context, read_options, &handler_data](std::vector&& segs) { + .thenValue([store, pipeline_context, read_options, handler_data](std::vector&& segs) { return prepare_output_frame(std::move(segs), pipeline_context, store, read_options, handler_data); }); } else { @@ -2728,11 +2728,12 @@ VersionedItem generate_result_versioned_item(const VersionIdentifier& version_in folly::Future read_frame_for_version( const std::shared_ptr& store, const VersionIdentifier& version_info, - const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data + const std::shared_ptr& read_query, const ReadOptions& read_options, + std::shared_ptr handler_data ) { return async::submit_io_task(SetupPipelineContextTask{store, version_info, read_query, read_options}) .via(&async::cpu_executor()) - .thenValue([store, read_query, read_options, version_info, &handler_data](auto&& pipeline_context) { + .thenValue([store, read_query, read_options, version_info, handler_data](auto&& pipeline_context) { auto res_versioned_item = generate_result_versioned_item(version_info); if (pipeline_context->multi_key_) { if (read_query) { @@ -2754,7 +2755,7 @@ folly::Future read_frame_for_version( .thenValue([res_versioned_item = std::move(res_versioned_item), pipeline_context, read_options, - &handler_data, + handler_data, read_query, shared_data](auto&& frame) mutable { ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index d259ab6564a..27789f8ed38 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -89,7 +89,7 @@ ColumnStats get_column_stats_info_impl(const std::shared_ptr& store, cons folly::Future read_multi_key( const std::shared_ptr& store, const ReadOptions& read_options, const SegmentInMemory& index_key_seg, - std::any& handler_data + std::shared_ptr handler_data ); folly::Future> schedule_remaining_iterations( @@ -143,7 +143,8 @@ void add_index_columns_to_query(const ReadQuery& read_query, const TimeseriesDes folly::Future read_frame_for_version( const std::shared_ptr& store, const VersionIdentifier& version_info, - const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data + const std::shared_ptr& read_query, const ReadOptions& read_options, + std::shared_ptr handler_data ); folly::Future read_and_process( @@ -181,7 +182,7 @@ std::optional get_delete_keys_on_failure( folly::Future prepare_output_frame( std::vector&& items, const std::shared_ptr& pipeline_context, - const std::shared_ptr& store, const ReadOptions& read_options, std::any& handler_data + const std::shared_ptr& store, const ReadOptions& read_options, std::shared_ptr handler_data ); folly::Future read_modify_write_impl( diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index bebc9da98d3..11eae92c703 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -952,7 +952,7 @@ std::unordered_map PythonVersionStore::get_all_tombstoned_versi std::vector> PythonVersionStore::batch_read( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options, - std::any& handler_data + std::shared_ptr handler_data ) { auto read_versions_or_errors = @@ -1044,7 +1044,7 @@ void PythonVersionStore::delete_snapshot_sync(const SnapshotId& snap_name, const ReadResult PythonVersionStore::read_dataframe_version( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) { auto opt_version_and_frame = @@ -1460,7 +1460,7 @@ void write_dataframe_to_file( ReadResult read_dataframe_from_file( const StreamId& stream_id, const std::string& path, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) { auto release_gil = std::make_unique(); diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index 54706ef9d34..1545a8f70cd 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -121,7 +121,7 @@ class PythonVersionStore : public LocalVersionedEngine { ReadResult read_dataframe_version( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ); VersionedItem read_modify_write( @@ -231,7 +231,7 @@ class PythonVersionStore : public LocalVersionedEngine { std::vector> batch_read( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, const BatchReadOptions& batch_read_options, - std::any& handler_data + std::shared_ptr handler_data ); std::vector batch_update( @@ -308,7 +308,7 @@ void write_dataframe_to_file( ReadResult read_dataframe_from_file( const StreamId& stream_id, const std::string& path, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ); struct ManualClockVersionStore : PythonVersionStore { diff --git a/cpp/arcticdb/version/versioned_engine.hpp b/cpp/arcticdb/version/versioned_engine.hpp index 77780475b24..97ac7b6e82d 100644 --- a/cpp/arcticdb/version/versioned_engine.hpp +++ b/cpp/arcticdb/version/versioned_engine.hpp @@ -89,7 +89,7 @@ class VersionedEngine { virtual ReadVersionWithNodesOutput read_dataframe_version_internal( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, - const ReadOptions& read_options, std::any& handler_data + const ReadOptions& read_options, std::shared_ptr handler_data ) = 0; virtual VersionedItem read_modify_write_internal( diff --git a/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py b/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py new file mode 100644 index 00000000000..3b83ac59f54 --- /dev/null +++ b/python/tests/unit/arcticdb/version_store/test_folly_exceptions.py @@ -0,0 +1,49 @@ +import pandas as pd +import pytest + +from arcticdb.util.test import config_context_multi +from arcticdb_ext.storage import KeyType +import arcticdb_ext.cpp_async as adb_async + + +@pytest.fixture +def missing_tdata_lib(s3_store_factory, sym): + try: + with config_context_multi({"VersionStore.NumIOThreads": 100, "VersionStore.NumCPUThreads": 100}): + adb_async.reinit_task_scheduler() + assert adb_async.io_thread_count() == 100 + assert adb_async.cpu_thread_count() == 100 + + # dynamic_strings registers a PythonStringHandler, which causes handler_data + # to be accessed during segment decoding on IO threads. + lib = s3_store_factory(segment_row_size=1, dynamic_strings=True) + num_rows = 100 + + df = pd.DataFrame({"a": ["b"] * num_rows}, index=pd.date_range("2026-01-01", periods=num_rows, freq="s")) + lib.write(sym, df) + + lib_tool = lib.library_tool() + data_keys = lib_tool.find_keys_for_symbol(KeyType.TABLE_DATA, sym) + assert len(data_keys) == num_rows + + # Triggers an IO exception in a folly task to verify proper exception handling. + # folly::collect fails fast when a task raises an exception. If the caller's stack + # unwinds while other folly tasks are still running, they may access destroyed + # stack variables, causing a segfault + lib_tool.remove(data_keys[0]) + + yield lib, sym + finally: + adb_async.reinit_task_scheduler() + + +def test_read_with_missing_tdata_raises(missing_tdata_lib): + lib, sym = missing_tdata_lib + with pytest.raises(Exception): + lib.read(sym) + + +def test_batch_read_with_missing_tdata_raises(missing_tdata_lib): + lib, sym = missing_tdata_lib + with pytest.raises(Exception): + lib.batch_read([sym])