From 3264623d8ebbd831a316d542cf2d191208b93400 Mon Sep 17 00:00:00 2001 From: Benjamin Wagner Date: Fri, 3 Nov 2023 11:20:29 +0100 Subject: [PATCH] Fix Filter Code Generation Our ROF implementation showed some correctness issues in how we implement code generation for filters. A filter looks as follows: ``` /- IU Prov 1 ----------------> Filter 1 ---> Sink 1 Src / \- IU Prov 2 -> FilterScope ---> Filter 2 -> Sink 2 \-----------------/ ``` The IUs going from FilterScope to the filters are void-typed pseudo IUs. The problem was that we could run into cases where FilterScope would generate its nested `IF` before both IU providers were opened. In those cases, we would generate the IU provider iterator only within the nested filter scope, causing it to "lag" behind the other iterator, producing incorrect results. The core problem is that we do not model a code generation dependency between `IU Prov 1 -> FilterScope`. This commit ensures that all input IU providers generate their code before the if is opened. Now, `Filter 1` and `Filter 2` only request code generation of `FilterScope`, and `FilterScope` requests generating both IU providers. This is done in somewhat hacky way, if we rebuilt the system today we should not model code generation dependencies through void-typed pseudo IUs. We should instead probably model IU and codegen dependencies separately. --- src/algebra/Filter.cpp | 7 ++- src/algebra/Join.cpp | 9 ++- src/algebra/suboperators/ColumnFilter.cpp | 58 ++++++++++++-------- src/algebra/suboperators/ColumnFilter.h | 13 +++-- src/codegen/backend_c/BackendC.cpp | 2 +- src/exec/PipelineExecutor.cpp | 3 - src/exec/runners/InterpretedRunner.cpp | 6 +- src/interpreter/ColumnFilterFragmentizer.cpp | 34 +++++++----- test/suboperators/test_runtime_param.cpp | 10 ++-- tools/inkfuse_bench.cpp | 1 - 10 files changed, 83 insertions(+), 60 deletions(-) diff --git a/src/algebra/Filter.cpp b/src/algebra/Filter.cpp index e172381..95b5136 100644 --- a/src/algebra/Filter.cpp +++ b/src/algebra/Filter.cpp @@ -32,13 +32,16 @@ void Filter::decay(PipelineDAG& dag) const children[0]->decay(dag); auto& pipe = dag.getCurrentPipeline(); // Attach the control flow sub-operator. - auto scope = ColumnFilterScope::build(this, filter_iu, pseudo_iu); - pipe.attachSuboperator(std::move(scope)); + auto& scope_supop = pipe.attachSuboperator(ColumnFilterScope::build(this, filter_iu, pseudo_iu)); + auto& scope = reinterpret_cast(scope_supop); // Attach the logic operators performing the copies. for (size_t k = 0; k < redefined.size(); ++k) { const IU& old_iu = *to_redefine[k]; const IU& new_iu = redefined[k]; auto logic = ColumnFilterLogic::build(this, pseudo_iu, old_iu, new_iu); + // Attach filter dependency to the scope suboperator. This makes sure the source IUs + // are generated before the filter. + scope.attachFilterLogicDependency(*logic, old_iu); pipe.attachSuboperator(std::move(logic)); } } diff --git a/src/algebra/Join.cpp b/src/algebra/Join.cpp index ce73d62..9c1795c 100644 --- a/src/algebra/Join.cpp +++ b/src/algebra/Join.cpp @@ -286,14 +286,17 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const { } // 2.3 Filter on probe matches. - probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu)); + auto& filter_scope_subop = probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu)); + auto& filter_scope = reinterpret_cast(filter_scope_subop); // The filter on the build site filters "itself". This has some repercussions on the repiping // behaviour of the suboperator and needs to be passed explicitly. - probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true)); + auto& filter_1 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true)); + filter_scope.attachFilterLogicDependency(filter_1, *lookup_right); if (type != JoinType::LeftSemi) { // If we need to produce columns on the probe side, we also have to filter the probe result. // Note: the filtered ByteArray from the probe side becomes a Char* after filtering. - probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type)); + auto& filter_2 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type)); + filter_scope.attachFilterLogicDependency(filter_2, *scratch_pad_right); } // 2.4 Unpack everything. diff --git a/src/algebra/suboperators/ColumnFilter.cpp b/src/algebra/suboperators/ColumnFilter.cpp index 723a0eb..ff5a1a0 100644 --- a/src/algebra/suboperators/ColumnFilter.cpp +++ b/src/algebra/suboperators/ColumnFilter.cpp @@ -5,18 +5,35 @@ namespace inkfuse { -SuboperatorArc ColumnFilterScope::build(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo) -{ +SuboperatorArc ColumnFilterScope::build(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo) { return SuboperatorArc{new ColumnFilterScope(source_, filter_iu_, pseudo)}; } ColumnFilterScope::ColumnFilterScope(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo) -: TemplatedSuboperator(source_, std::vector{&pseudo}, std::vector{&filter_iu_}) -{ + : TemplatedSuboperator(source_, std::vector{&pseudo}, std::vector{&filter_iu_}) { } -void ColumnFilterScope::consumeAllChildren(CompilationContext& context) -{ +void ColumnFilterScope::open(CompilationContext& context) { + if (filter_logic_dependencies.empty()) { + throw std::runtime_error("ColumnFilterScope must be provided with all filter dependencies."); + } + // We first request all dependencies that the following `ColumnFilterLogics` will have. + // This ensures that their iterators are generated in the outer scope. + for (auto& [op, iu] : filter_logic_dependencies) { + context.requestIU(*op, *iu); + } + // No we request our actual filter IU. Once that code was generated we enter `consumeAllChildren`. + for (const IU* in : source_ius) { + context.requestIU(*this, *in); + } +} + +void ColumnFilterScope::attachFilterLogicDependency(Suboperator& subop, const IU& iu) { + assert(dynamic_cast(&subop)); + filter_logic_dependencies.push_back({&subop, &iu}); +} + +void ColumnFilterScope::consumeAllChildren(CompilationContext& context) { auto& builder = context.getFctBuilder(); const auto& program = context.getProgram(); @@ -33,8 +50,7 @@ void ColumnFilterScope::consumeAllChildren(CompilationContext& context) } } -void ColumnFilterScope::close(CompilationContext& context) -{ +void ColumnFilterScope::close(CompilationContext& context) { // We can now close the sub-operator, this will terminate the if statement // and reinstall the original block. opt_if->End(); @@ -42,19 +58,16 @@ void ColumnFilterScope::close(CompilationContext& context) context.notifyOpClosed(*this); } -std::string ColumnFilterScope::id() const -{ +std::string ColumnFilterScope::id() const { return "ColumnFilterScope"; } -SuboperatorArc ColumnFilterLogic::build(const RelAlgOp* source_, const IU& pseudo, const IU& incoming_, const IU& redefined, IR::TypeArc filter_type_, bool filters_itself_) -{ +SuboperatorArc ColumnFilterLogic::build(const RelAlgOp* source_, const IU& pseudo, const IU& incoming_, const IU& redefined, IR::TypeArc filter_type_, bool filters_itself_) { return SuboperatorArc{new ColumnFilterLogic(source_, pseudo, incoming_, redefined, std::move(filter_type_), filters_itself_)}; } ColumnFilterLogic::ColumnFilterLogic(const RelAlgOp* source_, const IU& pseudo, const IU& incoming, const IU& redefined, IR::TypeArc filter_type_, bool filters_itself_) - : TemplatedSuboperator(source_, std::vector{&redefined}, std::vector{&pseudo, &incoming}), filter_type(std::move(filter_type_)), filters_itself(filters_itself_) -{ + : TemplatedSuboperator(source_, std::vector{&redefined}, std::vector{&pseudo, &incoming}), filter_type(std::move(filter_type_)), filters_itself(filters_itself_) { // When filtering a ByteArray something subtle happens: // The result column becomes a char*. This way we don't have to copy the entire byte array, but rather // just pointers. An alternative implementation would be to have variable size sinks and to do @@ -66,18 +79,16 @@ ColumnFilterLogic::ColumnFilterLogic(const RelAlgOp* source_, const IU& pseudo, } } -void ColumnFilterLogic::open(CompilationContext& context) -{ - // First request the incoming IU that gets filtered. This is extremely important as their iterator +void ColumnFilterLogic::open(CompilationContext& context) { + // We do not request incoming IU that gets filtered. This is extremely important as their iterator // should be generated outside of the `if`. If we don't descend the DAG this way, then we might generate // the variable-size state iterator inside the nested control flow. - context.requestIU(*this, *source_ius[1]); - // Only now request generation of the `if`. We know that the variable-sized iterators are outside the `if`. + + // Only request generation of the `if`. The ColumnFilterScope will generate the input IU that will be filtered. context.requestIU(*this, *source_ius[0]); } -void ColumnFilterLogic::consumeAllChildren(CompilationContext& context) -{ +void ColumnFilterLogic::consumeAllChildren(CompilationContext& context) { auto& builder = context.getFctBuilder(); // Get the definition of the filter IU - this is the second source IU. @@ -97,8 +108,7 @@ void ColumnFilterLogic::consumeAllChildren(CompilationContext& context) context.notifyIUsReady(*this); } -std::string ColumnFilterLogic::id() const -{ +std::string ColumnFilterLogic::id() const { if (filters_itself) { return "ColumnSelfFilterLogic_" + filter_type->id() + "_" + source_ius[1]->type->id(); } else { @@ -106,4 +116,4 @@ std::string ColumnFilterLogic::id() const } } -} \ No newline at end of file +} diff --git a/src/algebra/suboperators/ColumnFilter.h b/src/algebra/suboperators/ColumnFilter.h index bbc5426..b2615c2 100644 --- a/src/algebra/suboperators/ColumnFilter.h +++ b/src/algebra/suboperators/ColumnFilter.h @@ -18,7 +18,6 @@ namespace inkfuse { /// Scoping operator building the if statement needed in a filter. struct ColumnFilterScope : public TemplatedSuboperator { - /// Set up a ColumnFilterScope that consumes the filter IU and defines a new pseudo IU. static SuboperatorArc build(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo); @@ -30,6 +29,11 @@ struct ColumnFilterScope : public TemplatedSuboperator { void consumeAllChildren(CompilationContext& context) override; /// On close we can terminate the if block we have created for downstream consumers. void close(CompilationContext& context) override; + /// On open we request the filter iu, but also all dependent IUs from ColumnFilterLogic. + void open(CompilationContext& context) override; + + /// Attach a dependency of a future ColumnFilterLogic. + void attachFilterLogicDependency(Suboperator& subop, const IU& iu); std::string id() const override; @@ -38,11 +42,14 @@ struct ColumnFilterScope : public TemplatedSuboperator { /// In-flight if statement being generated. std::optional opt_if; + /// The `ColumnFilterScope` requests the input IUs for the `ColumnFilterLogic`. + /// This is done to make sure that their iterators are generated outside of the + /// nested scope created by the `if`. + std::vector> filter_logic_dependencies; }; /// Logic operator redefining the IU as a copy of the old one. struct ColumnFilterLogic : public TemplatedSuboperator { - /// Set up a ColumnFilterLogic that consumes the ColumnFilterScope pseudo IU and redefines the incoming one. static SuboperatorArc build(const RelAlgOp* source_, const IU& pseudo, const IU& incoming, const IU& redefined, IR::TypeArc filter_type_ = IR::Bool::build(), bool filters_itself = false); @@ -65,9 +72,7 @@ struct ColumnFilterLogic : public TemplatedSuboperator { /// Does this filter filter itself? I.e. the filter IU is the same one as the /// incoming one that gets redefined? bool filters_itself; - }; - } #endif //INKFUSE_COLUMNCOMPACTOR_H diff --git a/src/codegen/backend_c/BackendC.cpp b/src/codegen/backend_c/BackendC.cpp index 1460fd6..da94425 100644 --- a/src/codegen/backend_c/BackendC.cpp +++ b/src/codegen/backend_c/BackendC.cpp @@ -9,7 +9,7 @@ namespace inkfuse { namespace { -static constexpr bool debug_mode = true; +static constexpr bool debug_mode = false; /// Generate the path for the c program. std::string path(std::string_view program_name) { diff --git a/src/exec/PipelineExecutor.cpp b/src/exec/PipelineExecutor.cpp index 0a9096f..e7bbb2c 100644 --- a/src/exec/PipelineExecutor.cpp +++ b/src/exec/PipelineExecutor.cpp @@ -201,8 +201,6 @@ PipelineExecutor::PipelineStats PipelineExecutor::runPipeline() { // Store how long we were stalled waiting for compilation to finish. result.codegen_microseconds = std::chrono::duration_cast(compilation_done_ts - start_execution_ts).count(); - std::cout << "Found " << compilation_jobs.size() << " JIT fragments \n"; - for (auto& compiled_fragment : compile_state) { assert(compiled_fragment->compiled); compiled_fragment->compiled->setUpState(); @@ -294,7 +292,6 @@ void PipelineExecutor::setUpInterpreted() { std::vector PipelineExecutor::setUpFusedAsync(ExecutionMode mode) { // Create a compile state for the respective JIT interval. auto attach_compile_state = [&](size_t start, size_t end) { - std::cout << "Attaching compile state " << start << ", " << end << std::endl; auto repiped = pipe.repipeRequired(start, end); std::pair jit_interval{start, end}; // Create a fragment name that will be unique across different ROF intervals in the pipeline. diff --git a/src/exec/runners/InterpretedRunner.cpp b/src/exec/runners/InterpretedRunner.cpp index 173c2e2..19660ac 100644 --- a/src/exec/runners/InterpretedRunner.cpp +++ b/src/exec/runners/InterpretedRunner.cpp @@ -42,9 +42,9 @@ InterpretedRunner::~InterpretedRunner() { Suboperator::PickMorselResult InterpretedRunner::pickMorsel(size_t thread_id) { if (mode == ExecutionMode::ZeroCopyScan && !pick_from_source_table) { - // If this is a suboperator interpreting a table scan, but not the first one, - // then we should not pick from the table scan. We need to bind to the original - // morsel of the first interpreter instead. + // If this is a suboperator interpreting a table scan, but not the first one, + // then we should not pick from the table scan. We need to bind to the original + // morsel of the first interpreter instead. return Suboperator::PickedMorsel{ .morsel_size = 1, }; diff --git a/src/interpreter/ColumnFilterFragmentizer.cpp b/src/interpreter/ColumnFilterFragmentizer.cpp index 0627df6..e92370f 100644 --- a/src/interpreter/ColumnFilterFragmentizer.cpp +++ b/src/interpreter/ColumnFilterFragmentizer.cpp @@ -14,42 +14,48 @@ const std::vector condition_types = {IR::Bool::build(), IR::Pointer ColumnFilterFragmentizer::ColumnFilterFragmentizer() { // Not self filtering. - for (auto& type: types) { - for (auto& condition_type: condition_types) { + for (auto& type : types) { + for (auto& condition_type : condition_types) { auto& [name, pipe] = pipes.emplace_back(); const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter"); // Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink. const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), ""); const auto& target_iu = generated_ius.emplace_back(type, "target_iu"); const auto& target_iu_out = generated_ius.emplace_back(type, "target_iu_filtered"); - pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope_subop = pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope = reinterpret_cast(filter_scope_subop); auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, target_iu, target_iu_out, condition_type, false)); + filter_scope.attachFilterLogicDependency(filter_subop, target_iu); name = filter_subop.id(); } } // Self filtering. - for (auto& condition_type: condition_types) { - auto& [name, pipe] = pipes.emplace_back(); - const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter"); - // Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink. - const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), ""); - const auto& target_iu_out = generated_ius.emplace_back(condition_type, "target_iu_filtered"); - pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); - auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, filter_iu, target_iu_out, condition_type, true)); - name = filter_subop.id(); + for (auto& condition_type : condition_types) { + auto& [name, pipe] = pipes.emplace_back(); + const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter"); + // Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink. + const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), ""); + const auto& target_iu_out = generated_ius.emplace_back(condition_type, "target_iu_filtered"); + auto& filter_scope_subop = pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope = reinterpret_cast(filter_scope_subop); + auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, filter_iu, target_iu_out, condition_type, true)); + filter_scope.attachFilterLogicDependency(filter_subop, filter_iu); + name = filter_subop.id(); } // Filter on a ByteArray - This one is special as it returns a Char*. // We only copy over the pointers and not the entire array. { - for (auto& condition_type: condition_types) { + for (auto& condition_type : condition_types) { auto& [name, pipe] = pipes.emplace_back(); const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter"); // Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink. const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), ""); const auto& target_iu = generated_ius.emplace_back(IR::ByteArray::build(0), "target_iu"); const auto& target_iu_out = generated_ius.emplace_back(IR::Pointer::build(IR::Char::build()), "target_iu_filtered"); - pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope_subop = pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope = reinterpret_cast(filter_scope_subop); auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, target_iu, target_iu_out, condition_type, false)); + filter_scope.attachFilterLogicDependency(filter_subop, target_iu); name = filter_subop.id(); } } diff --git a/test/suboperators/test_runtime_param.cpp b/test/suboperators/test_runtime_param.cpp index 4f542b1..62d4342 100644 --- a/test/suboperators/test_runtime_param.cpp +++ b/test/suboperators/test_runtime_param.cpp @@ -10,12 +10,12 @@ namespace { // Test the runtime parameter code generation with a runtime param that gets loaded at runtime. TEST(test_runtime_param, runtime_expression_no_val) { Pipeline pipe; - CompilationContext context("runtime_param_test", pipe); IU source(IR::UnsignedInt::build(4)); IU provided(IR::UnsignedInt::build(4)); auto l_expr = RuntimeExpressionSubop::build(nullptr, {&provided}, {&source}, ExpressionOp::ComputeNode::Type::Add, IR::UnsignedInt::build(4)); pipe.attachSuboperator(l_expr); - pipe.repipeAll(0, 1); + auto repiped = pipe.repipeAll(0, 1); + CompilationContext context("runtime_param_test", *repiped); EXPECT_NO_THROW(context.compile()); } @@ -24,16 +24,16 @@ TEST(test_runtime_param, runtime_expression_val) { RuntimeExpressionParams params; params.dataSet(IR::UI<4>::build(4)); Pipeline pipe; - CompilationContext context("runtime_param_test", pipe); IU source(IR::UnsignedInt::build(4)); IU provided(IR::UnsignedInt::build(4)); auto l_expr = RuntimeExpressionSubop::build(nullptr, {&provided}, {&source}, ExpressionOp::ComputeNode::Type::Add, IR::UnsignedInt::build(4)); static_cast(*l_expr).attachRuntimeParams(std::move(params)); pipe.attachSuboperator(l_expr); - pipe.repipeAll(0, 1); + auto repiped = pipe.repipeAll(0, 1); + CompilationContext context("runtime_param_test", *repiped); EXPECT_NO_THROW(context.compile()); } } -} \ No newline at end of file +} diff --git a/tools/inkfuse_bench.cpp b/tools/inkfuse_bench.cpp index 396a5f4..662b00b 100644 --- a/tools/inkfuse_bench.cpp +++ b/tools/inkfuse_bench.cpp @@ -52,7 +52,6 @@ const std::vector> backe {"fused", PipelineExecutor::ExecutionMode::Fused}, {"rof", PipelineExecutor::ExecutionMode::ROF}, }; - } int main(int argc, char* argv[]) {