Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/arcticdb/arrow/test/test_arrow_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ TEST(ArrowWriteMemoryLifetime, InputFrameKeepsBufferAlive) {
// Read back and verify
auto read_query = std::make_shared<ReadQuery>();
register_native_handler_data_factory();
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
auto handler_data =
std::make_shared<std::any>(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE));
auto read_result =
engine.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data);
const auto& seg = read_result.root_.frame_and_descriptor_.frame_;
Expand Down
9 changes: 6 additions & 3 deletions cpp/arcticdb/column_store/test/ingestion_stress_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ TEST_F(IngestionStressStore, ScalarIntAppend) {
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
auto handler_data =
std::make_shared<std::any>(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE));
auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data);
}

Expand Down Expand Up @@ -222,7 +223,8 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) {
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
auto handler_data =
std::make_shared<std::any>(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE));
auto read_result = test_store_->read_dataframe_version_internal(
symbol, VersionQuery{}, read_query, read_options, handler_data
);
Expand Down Expand Up @@ -283,7 +285,8 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) {
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
auto handler_data =
std::make_shared<std::any>(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE));
auto read_result =
test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data);
}
3 changes: 2 additions & 1 deletion cpp/arcticdb/pipeline/pipeline_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ inline ReadResult read_result_from_single_frame(
pipeline_context->begin()->set_string_pool(frame_and_desc.frame_.string_pool_ptr());
auto descriptor = std::make_shared<StreamDescriptor>(frame_and_desc.frame_.descriptor());
pipeline_context->begin()->set_descriptor(std::move(descriptor));
reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get();
std::shared_ptr<std::any> handler_data_ptr(std::shared_ptr<std::any>{}, &handler_data);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a shared_ptr with a null owning pointer and a raw pointer to handler_data — a null-deleter alias — to adapt a reference into the new shared_ptr<std::any> API. Because .get() is called immediately on the next line, the caller's frame is alive for the full duration and this is safe here.

The identical pattern is used in local_versioned_engine.cpp for batch_read_and_join_internal (also guarded by .get()). Both are fragile: removing .get() or copying the pattern without the blocking call would produce a dangling pointer.

Consider propagating std::shared_ptr<std::any> all the way through batch_read_and_join_internal and read_result_from_single_frame as well (matching the rest of the chain changed in this PR), which would eliminate the aliasing wrapper entirely and remove the implicit lifetime constraint.

reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data_ptr).get();
apply_type_handlers(frame_and_desc.frame_, handler_data, output_format);
return create_python_read_result(VersionedItem{key}, output_format, std::move(frame_and_desc));
}
Expand Down
24 changes: 12 additions & 12 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,14 +847,14 @@ class NullValueReducer {
size_t pos_;
size_t column_block_idx_;
DecodePathData shared_data_;
std::any& handler_data_;
std::shared_ptr<std::any> handler_data_;
const OutputFormat output_format_;
std::optional<Value> default_value_;

public:
NullValueReducer(
Column& column, std::shared_ptr<PipelineContext>& context, SegmentInMemory frame,
DecodePathData shared_data, std::any& handler_data, OutputFormat output_format,
DecodePathData shared_data, std::shared_ptr<std::any> handler_data, OutputFormat output_format,
std::optional<Value> default_value = {}
) :
column_(column),
Expand Down Expand Up @@ -916,7 +916,7 @@ class NullValueReducer {
handler) {
auto type_size = data_type_size(column_.type());
handler->default_initialize(
column_.buffer(), start_row * type_size, num_rows * type_size, shared_data_, handler_data_
column_.buffer(), start_row * type_size, num_rows * type_size, shared_data_, *handler_data_
);
} else if (output_format_ != OutputFormat::ARROW || default_value_.has_value()) {
// Arrow does not care what values are in the main buffer where the validity bitmap is zero
Expand Down Expand Up @@ -952,13 +952,13 @@ struct ReduceColumnTask : async::BaseTask {
std::shared_ptr<FrameSliceMap> slice_map_;
std::shared_ptr<PipelineContext> context_;
DecodePathData shared_data_;
std::any& handler_data_;
std::shared_ptr<std::any> handler_data_;
ReadOptions read_options_;

ReduceColumnTask(
SegmentInMemory frame, size_t c, std::shared_ptr<FrameSliceMap> slice_map,
std::shared_ptr<PipelineContext>& context, DecodePathData shared_data, std::any& handler_data,
const ReadOptions& read_options
std::shared_ptr<PipelineContext>& context, DecodePathData shared_data,
std::shared_ptr<std::any> handler_data, const ReadOptions& read_options
) :
frame_(std::move(frame)),
column_index_(c),
Expand Down Expand Up @@ -1050,7 +1050,7 @@ struct ReduceColumnTask : async::BaseTask {

folly::Future<folly::Unit> reduce_and_fix_columns(
std::shared_ptr<PipelineContext>& context, SegmentInMemory& frame, const ReadOptions& read_options,
std::any& handler_data
std::shared_ptr<std::any> handler_data
) {
ARCTICDB_SAMPLE_DEFAULT(ReduceAndFixStringCol)
ARCTICDB_DEBUG(log::version(), "Reduce and fix columns");
Expand All @@ -1074,7 +1074,7 @@ folly::Future<folly::Unit> reduce_and_fix_columns(
static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100);
return folly::collect(folly::window(
std::move(fields_to_reduce),
[context, frame, slice_map, shared_data, read_options, &handler_data](size_t field
[context, frame, slice_map, shared_data, read_options, handler_data](size_t field
) mutable {
return async::submit_cpu_task(ReduceColumnTask(
frame, field, slice_map, context, shared_data, handler_data, read_options
Expand All @@ -1089,7 +1089,7 @@ folly::Future<folly::Unit> reduce_and_fix_columns(
folly::Future<SegmentInMemory> fetch_data(
SegmentInMemory&& frame, const std::shared_ptr<PipelineContext>& context,
const std::shared_ptr<stream::StreamSource>& ssource, const ReadQuery& read_query,
const ReadOptions& read_options, DecodePathData shared_data, std::any& handler_data
const ReadOptions& read_options, DecodePathData shared_data, std::shared_ptr<std::any> handler_data
) {
ARCTICDB_SAMPLE_DEFAULT(FetchSlices)
if (frame.empty())
Expand All @@ -1108,17 +1108,17 @@ folly::Future<SegmentInMemory> fetch_data(
frame = frame,
dynamic_schema = dynamic_schema,
shared_data,
&handler_data,
handler_data,
read_query,
read_options](auto&& ks) mutable {
auto key_seg = std::forward<storage::KeySegmentPair>(ks);
if (dynamic_schema) {
decode_into_frame_dynamic(
frame, row, key_seg, shared_data, handler_data, read_query, read_options
frame, row, key_seg, shared_data, *handler_data, read_query, read_options
);
} else {
decode_into_frame_static(
frame, row, key_seg, shared_data, handler_data, read_query, read_options
frame, row, key_seg, shared_data, *handler_data, read_query, read_options
);
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/pipeline/read_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void mark_index_slices(const std::shared_ptr<PipelineContext>& context);
folly::Future<SegmentInMemory> fetch_data(
SegmentInMemory&& frame, const std::shared_ptr<PipelineContext>& context,
const std::shared_ptr<stream::StreamSource>& ssource, const ReadQuery& read_query,
const ReadOptions& read_options, DecodePathData shared_data, std::any& handler_data
const ReadOptions& read_options, DecodePathData shared_data, std::shared_ptr<std::any> handler_data
);

void decode_into_frame_static(
Expand All @@ -86,7 +86,7 @@ void decode_into_frame_dynamic(

folly::Future<folly::Unit> reduce_and_fix_columns(
std::shared_ptr<PipelineContext>& context, SegmentInMemory& frame, const ReadOptions& read_options,
std::any& handler_data
std::shared_ptr<std::any> handler_data
);

StreamDescriptor get_filtered_descriptor(
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/file/file_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void write_dataframe_to_file_internal(
version_store::ReadVersionOutput read_dataframe_from_file_internal(
const StreamId& stream_id, const std::string& path, const std::shared_ptr<ReadQuery>& read_query,
const ReadOptions& read_options, const arcticdb::proto::encoding::VariantCodec& codec_opts,
std::any& handler_data
std::shared_ptr<std::any> handler_data
) {
auto config = storage::file::pack_config(path, codec_opts);
storage::LibraryPath lib_path{std::string{"file"}, fmt::format("{}", stream_id)};
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/storage/test/test_memory_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ TEST(InMemory, ReadTwice) {

auto read_query = std::make_shared<ReadQuery>();
register_native_handler_data_factory();
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
auto handler_data =
std::make_shared<std::any>(TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE));
auto read_result1 = version_store.read_dataframe_version_internal(
symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data
);
Expand Down
19 changes: 10 additions & 9 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ VersionIdentifier get_version_identifier(

ReadVersionWithNodesOutput LocalVersionedEngine::read_dataframe_version_internal(
const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr<ReadQuery>& read_query,
const ReadOptions& read_options, std::any& handler_data
const ReadOptions& read_options, std::shared_ptr<std::any> handler_data
) {
py::gil_scoped_release release_gil;
const auto identifier = util::variant_match(
Expand Down Expand Up @@ -1312,7 +1312,7 @@ VersionedItem LocalVersionedEngine::defragment_symbol_data(
std::vector<std::variant<ReadVersionWithNodesOutput, DataError>> LocalVersionedEngine::batch_read_internal(
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const BatchReadOptions& batch_read_options,
std::any& handler_data
std::shared_ptr<std::any> handler_data
) {
py::gil_scoped_release release_gil;
if (stream_ids.empty()) {
Expand All @@ -1336,7 +1336,7 @@ std::vector<std::variant<ReadVersionWithNodesOutput, DataError>> LocalVersionedE
read_query =
read_queries.empty() ? std::make_shared<ReadQuery>() : read_queries[idx],
read_options = batch_read_options.at(idx),
&handler_data](auto&& opt_index_key) {
handler_data](auto&& opt_index_key) {
auto version_info = get_version_identifier(
stream_ids[idx],
version_queries[idx],
Expand All @@ -1351,7 +1351,7 @@ std::vector<std::variant<ReadVersionWithNodesOutput, DataError>> LocalVersionedE
read_query =
read_queries.empty() ? std::make_shared<ReadQuery>() : read_queries[idx],
read_options = batch_read_options.at(idx),
&handler_data](ReadVersionOutput&& result) {
handler_data](ReadVersionOutput&& result) {
auto& keys = result.frame_and_descriptor_.keys_;
if (keys.empty()) {
return folly::makeFuture(ReadVersionWithNodesOutput{std::move(result), {}});
Expand Down Expand Up @@ -1477,9 +1477,10 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal(
clause->set_component_manager(component_manager);
}
auto clauses_ptr = std::make_shared<std::vector<std::shared_ptr<Clause>>>(std::move(clauses));
std::shared_ptr<std::any> handler_data_ptr(std::shared_ptr<std::any>{}, &handler_data);
return folly::collect(symbol_processing_result_futs)
.via(&async::io_executor())
.thenValueInline([this, &handler_data, clauses_ptr, component_manager, read_options](
.thenValueInline([this, handler_data_ptr, clauses_ptr, component_manager, read_options](
std::vector<SymbolProcessingResult>&& symbol_processing_results
) mutable {
auto [input_schemas, entity_ids, res_versioned_items, res_metadatas] =
Expand All @@ -1493,14 +1494,14 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal(
std::shared_ptr<ColRange>>(*component_manager, std::move(processed_entity_ids));
return collect_segments(std::move(proc));
})
.thenValueInline([store = store(), &handler_data, pipeline_context, read_options](
.thenValueInline([store = store(), handler_data_ptr, pipeline_context, read_options](
std::vector<SliceAndKey>&& slice_and_keys
) mutable {
return prepare_output_frame(
std::move(slice_and_keys), pipeline_context, store, read_options, handler_data
std::move(slice_and_keys), pipeline_context, store, read_options, handler_data_ptr
);
})
.thenValueInline([&handler_data,
.thenValueInline([handler_data_ptr,
pipeline_context,
res_versioned_items,
res_metadatas,
Expand All @@ -1510,7 +1511,7 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal(
ReadOptions read_options_with_dynamic_schema = read_options.clone();
read_options_with_dynamic_schema.set_dynamic_schema(true);
return reduce_and_fix_columns(
pipeline_context, frame, read_options_with_dynamic_schema, handler_data
pipeline_context, frame, read_options_with_dynamic_schema, handler_data_ptr
)
.thenValueInline([pipeline_context,
frame,
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class LocalVersionedEngine : public VersionedEngine {

ReadVersionWithNodesOutput read_dataframe_version_internal(
const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr<ReadQuery>& read_query,
const ReadOptions& read_options, std::any& handler_data
const ReadOptions& read_options, std::shared_ptr<std::any> handler_data
) override;

VersionedItem read_modify_write_internal(
Expand Down Expand Up @@ -289,7 +289,7 @@ class LocalVersionedEngine : public VersionedEngine {
std::vector<std::variant<ReadVersionWithNodesOutput, DataError>> batch_read_internal(
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const BatchReadOptions& batch_read_options,
std::any& handler_data
std::shared_ptr<std::any> handler_data
);

MultiSymbolReadOutput batch_read_and_join_internal(
Expand Down
20 changes: 12 additions & 8 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,11 @@ void register_bindings(py::module& version, py::exception<arcticdb::ArcticExcept
"read_dataframe_from_file",
[](StreamId sid, std::string path, std::shared_ptr<ReadQuery>& read_query, const ReadOptions& read_options
) {
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
auto handler_data = std::make_shared<std::any>(
TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format())
);
return adapt_read_df(
read_dataframe_from_file(sid, path, read_query, read_options, handler_data), &handler_data
read_dataframe_from_file(sid, path, read_query, read_options, handler_data), handler_data.get()
);
}
);
Expand Down Expand Up @@ -792,11 +794,12 @@ void register_bindings(py::module& version, py::exception<arcticdb::ArcticExcept
const VersionQuery& version_query,
const std::shared_ptr<ReadQuery>& read_query,
const ReadOptions& read_options) {
auto handler_data =
TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
auto handler_data = std::make_shared<std::any>(
TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format())
);
return adapt_read_df(
v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data),
&handler_data
handler_data.get()
);
},
py::call_guard<SingleThreadMutexHolder>(),
Expand Down Expand Up @@ -938,13 +941,14 @@ void register_bindings(py::module& version, py::exception<arcticdb::ArcticExcept
const std::vector<VersionQuery>& version_queries,
std::vector<std::shared_ptr<ReadQuery>>& read_queries,
const BatchReadOptions& batch_read_options) {
auto handler_data =
TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format());
auto handler_data = std::make_shared<std::any>(
TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format())
);
return python_util::adapt_read_dfs(
v.batch_read(
stream_ids, version_queries, read_queries, batch_read_options, handler_data
),
&handler_data
handler_data.get()
);
},
py::call_guard<SingleThreadMutexHolder>(),
Expand Down
Loading
Loading