diff --git a/src/algebra/Filter.cpp b/src/algebra/Filter.cpp index e172381..95b5136 100644 --- a/src/algebra/Filter.cpp +++ b/src/algebra/Filter.cpp @@ -32,13 +32,16 @@ void Filter::decay(PipelineDAG& dag) const children[0]->decay(dag); auto& pipe = dag.getCurrentPipeline(); // Attach the control flow sub-operator. - auto scope = ColumnFilterScope::build(this, filter_iu, pseudo_iu); - pipe.attachSuboperator(std::move(scope)); + auto& scope_supop = pipe.attachSuboperator(ColumnFilterScope::build(this, filter_iu, pseudo_iu)); + auto& scope = reinterpret_cast(scope_supop); // Attach the logic operators performing the copies. for (size_t k = 0; k < redefined.size(); ++k) { const IU& old_iu = *to_redefine[k]; const IU& new_iu = redefined[k]; auto logic = ColumnFilterLogic::build(this, pseudo_iu, old_iu, new_iu); + // Attach filter dependency to the scope suboperator. This makes sure the source IUs + // are generated before the filter. + scope.attachFilterLogicDependency(*logic, old_iu); pipe.attachSuboperator(std::move(logic)); } } diff --git a/src/algebra/Join.cpp b/src/algebra/Join.cpp index ce73d62..9c1795c 100644 --- a/src/algebra/Join.cpp +++ b/src/algebra/Join.cpp @@ -286,14 +286,17 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const { } // 2.3 Filter on probe matches. - probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu)); + auto& filter_scope_subop = probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu)); + auto& filter_scope = reinterpret_cast(filter_scope_subop); // The filter on the build site filters "itself". This has some repercussions on the repiping // behaviour of the suboperator and needs to be passed explicitly. - probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true)); + auto& filter_1 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true)); + filter_scope.attachFilterLogicDependency(filter_1, *lookup_right); if (type != JoinType::LeftSemi) { // If we need to produce columns on the probe side, we also have to filter the probe result. // Note: the filtered ByteArray from the probe side becomes a Char* after filtering. - probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type)); + auto& filter_2 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type)); + filter_scope.attachFilterLogicDependency(filter_2, *scratch_pad_right); } // 2.4 Unpack everything. diff --git a/src/algebra/suboperators/ColumnFilter.cpp b/src/algebra/suboperators/ColumnFilter.cpp index 723a0eb..ff5a1a0 100644 --- a/src/algebra/suboperators/ColumnFilter.cpp +++ b/src/algebra/suboperators/ColumnFilter.cpp @@ -5,18 +5,35 @@ namespace inkfuse { -SuboperatorArc ColumnFilterScope::build(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo) -{ +SuboperatorArc ColumnFilterScope::build(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo) { return SuboperatorArc{new ColumnFilterScope(source_, filter_iu_, pseudo)}; } ColumnFilterScope::ColumnFilterScope(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo) -: TemplatedSuboperator(source_, std::vector{&pseudo}, std::vector{&filter_iu_}) -{ + : TemplatedSuboperator(source_, std::vector{&pseudo}, std::vector{&filter_iu_}) { } -void ColumnFilterScope::consumeAllChildren(CompilationContext& context) -{ +void ColumnFilterScope::open(CompilationContext& context) { + if (filter_logic_dependencies.empty()) { + throw std::runtime_error("ColumnFilterScope must be provided with all filter dependencies."); + } + // We first request all dependencies that the following `ColumnFilterLogics` will have. + // This ensures that their iterators are generated in the outer scope. + for (auto& [op, iu] : filter_logic_dependencies) { + context.requestIU(*op, *iu); + } + // No we request our actual filter IU. Once that code was generated we enter `consumeAllChildren`. + for (const IU* in : source_ius) { + context.requestIU(*this, *in); + } +} + +void ColumnFilterScope::attachFilterLogicDependency(Suboperator& subop, const IU& iu) { + assert(dynamic_cast(&subop)); + filter_logic_dependencies.push_back({&subop, &iu}); +} + +void ColumnFilterScope::consumeAllChildren(CompilationContext& context) { auto& builder = context.getFctBuilder(); const auto& program = context.getProgram(); @@ -33,8 +50,7 @@ void ColumnFilterScope::consumeAllChildren(CompilationContext& context) } } -void ColumnFilterScope::close(CompilationContext& context) -{ +void ColumnFilterScope::close(CompilationContext& context) { // We can now close the sub-operator, this will terminate the if statement // and reinstall the original block. opt_if->End(); @@ -42,19 +58,16 @@ void ColumnFilterScope::close(CompilationContext& context) context.notifyOpClosed(*this); } -std::string ColumnFilterScope::id() const -{ +std::string ColumnFilterScope::id() const { return "ColumnFilterScope"; } -SuboperatorArc ColumnFilterLogic::build(const RelAlgOp* source_, const IU& pseudo, const IU& incoming_, const IU& redefined, IR::TypeArc filter_type_, bool filters_itself_) -{ +SuboperatorArc ColumnFilterLogic::build(const RelAlgOp* source_, const IU& pseudo, const IU& incoming_, const IU& redefined, IR::TypeArc filter_type_, bool filters_itself_) { return SuboperatorArc{new ColumnFilterLogic(source_, pseudo, incoming_, redefined, std::move(filter_type_), filters_itself_)}; } ColumnFilterLogic::ColumnFilterLogic(const RelAlgOp* source_, const IU& pseudo, const IU& incoming, const IU& redefined, IR::TypeArc filter_type_, bool filters_itself_) - : TemplatedSuboperator(source_, std::vector{&redefined}, std::vector{&pseudo, &incoming}), filter_type(std::move(filter_type_)), filters_itself(filters_itself_) -{ + : TemplatedSuboperator(source_, std::vector{&redefined}, std::vector{&pseudo, &incoming}), filter_type(std::move(filter_type_)), filters_itself(filters_itself_) { // When filtering a ByteArray something subtle happens: // The result column becomes a char*. This way we don't have to copy the entire byte array, but rather // just pointers. An alternative implementation would be to have variable size sinks and to do @@ -66,18 +79,16 @@ ColumnFilterLogic::ColumnFilterLogic(const RelAlgOp* source_, const IU& pseudo, } } -void ColumnFilterLogic::open(CompilationContext& context) -{ - // First request the incoming IU that gets filtered. This is extremely important as their iterator +void ColumnFilterLogic::open(CompilationContext& context) { + // We do not request incoming IU that gets filtered. This is extremely important as their iterator // should be generated outside of the `if`. If we don't descend the DAG this way, then we might generate // the variable-size state iterator inside the nested control flow. - context.requestIU(*this, *source_ius[1]); - // Only now request generation of the `if`. We know that the variable-sized iterators are outside the `if`. + + // Only request generation of the `if`. The ColumnFilterScope will generate the input IU that will be filtered. context.requestIU(*this, *source_ius[0]); } -void ColumnFilterLogic::consumeAllChildren(CompilationContext& context) -{ +void ColumnFilterLogic::consumeAllChildren(CompilationContext& context) { auto& builder = context.getFctBuilder(); // Get the definition of the filter IU - this is the second source IU. @@ -97,8 +108,7 @@ void ColumnFilterLogic::consumeAllChildren(CompilationContext& context) context.notifyIUsReady(*this); } -std::string ColumnFilterLogic::id() const -{ +std::string ColumnFilterLogic::id() const { if (filters_itself) { return "ColumnSelfFilterLogic_" + filter_type->id() + "_" + source_ius[1]->type->id(); } else { @@ -106,4 +116,4 @@ std::string ColumnFilterLogic::id() const } } -} \ No newline at end of file +} diff --git a/src/algebra/suboperators/ColumnFilter.h b/src/algebra/suboperators/ColumnFilter.h index bbc5426..b2615c2 100644 --- a/src/algebra/suboperators/ColumnFilter.h +++ b/src/algebra/suboperators/ColumnFilter.h @@ -18,7 +18,6 @@ namespace inkfuse { /// Scoping operator building the if statement needed in a filter. struct ColumnFilterScope : public TemplatedSuboperator { - /// Set up a ColumnFilterScope that consumes the filter IU and defines a new pseudo IU. static SuboperatorArc build(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo); @@ -30,6 +29,11 @@ struct ColumnFilterScope : public TemplatedSuboperator { void consumeAllChildren(CompilationContext& context) override; /// On close we can terminate the if block we have created for downstream consumers. void close(CompilationContext& context) override; + /// On open we request the filter iu, but also all dependent IUs from ColumnFilterLogic. + void open(CompilationContext& context) override; + + /// Attach a dependency of a future ColumnFilterLogic. + void attachFilterLogicDependency(Suboperator& subop, const IU& iu); std::string id() const override; @@ -38,11 +42,14 @@ struct ColumnFilterScope : public TemplatedSuboperator { /// In-flight if statement being generated. std::optional opt_if; + /// The `ColumnFilterScope` requests the input IUs for the `ColumnFilterLogic`. + /// This is done to make sure that their iterators are generated outside of the + /// nested scope created by the `if`. + std::vector> filter_logic_dependencies; }; /// Logic operator redefining the IU as a copy of the old one. struct ColumnFilterLogic : public TemplatedSuboperator { - /// Set up a ColumnFilterLogic that consumes the ColumnFilterScope pseudo IU and redefines the incoming one. static SuboperatorArc build(const RelAlgOp* source_, const IU& pseudo, const IU& incoming, const IU& redefined, IR::TypeArc filter_type_ = IR::Bool::build(), bool filters_itself = false); @@ -65,9 +72,7 @@ struct ColumnFilterLogic : public TemplatedSuboperator { /// Does this filter filter itself? I.e. the filter IU is the same one as the /// incoming one that gets redefined? bool filters_itself; - }; - } #endif //INKFUSE_COLUMNCOMPACTOR_H diff --git a/src/codegen/backend_c/BackendC.cpp b/src/codegen/backend_c/BackendC.cpp index 1460fd6..da94425 100644 --- a/src/codegen/backend_c/BackendC.cpp +++ b/src/codegen/backend_c/BackendC.cpp @@ -9,7 +9,7 @@ namespace inkfuse { namespace { -static constexpr bool debug_mode = true; +static constexpr bool debug_mode = false; /// Generate the path for the c program. std::string path(std::string_view program_name) { diff --git a/src/exec/PipelineExecutor.cpp b/src/exec/PipelineExecutor.cpp index 0a9096f..e7bbb2c 100644 --- a/src/exec/PipelineExecutor.cpp +++ b/src/exec/PipelineExecutor.cpp @@ -201,8 +201,6 @@ PipelineExecutor::PipelineStats PipelineExecutor::runPipeline() { // Store how long we were stalled waiting for compilation to finish. result.codegen_microseconds = std::chrono::duration_cast(compilation_done_ts - start_execution_ts).count(); - std::cout << "Found " << compilation_jobs.size() << " JIT fragments \n"; - for (auto& compiled_fragment : compile_state) { assert(compiled_fragment->compiled); compiled_fragment->compiled->setUpState(); @@ -294,7 +292,6 @@ void PipelineExecutor::setUpInterpreted() { std::vector PipelineExecutor::setUpFusedAsync(ExecutionMode mode) { // Create a compile state for the respective JIT interval. auto attach_compile_state = [&](size_t start, size_t end) { - std::cout << "Attaching compile state " << start << ", " << end << std::endl; auto repiped = pipe.repipeRequired(start, end); std::pair jit_interval{start, end}; // Create a fragment name that will be unique across different ROF intervals in the pipeline. diff --git a/src/exec/runners/InterpretedRunner.cpp b/src/exec/runners/InterpretedRunner.cpp index 173c2e2..19660ac 100644 --- a/src/exec/runners/InterpretedRunner.cpp +++ b/src/exec/runners/InterpretedRunner.cpp @@ -42,9 +42,9 @@ InterpretedRunner::~InterpretedRunner() { Suboperator::PickMorselResult InterpretedRunner::pickMorsel(size_t thread_id) { if (mode == ExecutionMode::ZeroCopyScan && !pick_from_source_table) { - // If this is a suboperator interpreting a table scan, but not the first one, - // then we should not pick from the table scan. We need to bind to the original - // morsel of the first interpreter instead. + // If this is a suboperator interpreting a table scan, but not the first one, + // then we should not pick from the table scan. We need to bind to the original + // morsel of the first interpreter instead. return Suboperator::PickedMorsel{ .morsel_size = 1, }; diff --git a/src/interpreter/ColumnFilterFragmentizer.cpp b/src/interpreter/ColumnFilterFragmentizer.cpp index 0627df6..e92370f 100644 --- a/src/interpreter/ColumnFilterFragmentizer.cpp +++ b/src/interpreter/ColumnFilterFragmentizer.cpp @@ -14,42 +14,48 @@ const std::vector condition_types = {IR::Bool::build(), IR::Pointer ColumnFilterFragmentizer::ColumnFilterFragmentizer() { // Not self filtering. - for (auto& type: types) { - for (auto& condition_type: condition_types) { + for (auto& type : types) { + for (auto& condition_type : condition_types) { auto& [name, pipe] = pipes.emplace_back(); const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter"); // Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink. const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), ""); const auto& target_iu = generated_ius.emplace_back(type, "target_iu"); const auto& target_iu_out = generated_ius.emplace_back(type, "target_iu_filtered"); - pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope_subop = pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope = reinterpret_cast(filter_scope_subop); auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, target_iu, target_iu_out, condition_type, false)); + filter_scope.attachFilterLogicDependency(filter_subop, target_iu); name = filter_subop.id(); } } // Self filtering. - for (auto& condition_type: condition_types) { - auto& [name, pipe] = pipes.emplace_back(); - const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter"); - // Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink. - const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), ""); - const auto& target_iu_out = generated_ius.emplace_back(condition_type, "target_iu_filtered"); - pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); - auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, filter_iu, target_iu_out, condition_type, true)); - name = filter_subop.id(); + for (auto& condition_type : condition_types) { + auto& [name, pipe] = pipes.emplace_back(); + const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter"); + // Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink. + const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), ""); + const auto& target_iu_out = generated_ius.emplace_back(condition_type, "target_iu_filtered"); + auto& filter_scope_subop = pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope = reinterpret_cast(filter_scope_subop); + auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, filter_iu, target_iu_out, condition_type, true)); + filter_scope.attachFilterLogicDependency(filter_subop, filter_iu); + name = filter_subop.id(); } // Filter on a ByteArray - This one is special as it returns a Char*. // We only copy over the pointers and not the entire array. { - for (auto& condition_type: condition_types) { + for (auto& condition_type : condition_types) { auto& [name, pipe] = pipes.emplace_back(); const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter"); // Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink. const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), ""); const auto& target_iu = generated_ius.emplace_back(IR::ByteArray::build(0), "target_iu"); const auto& target_iu_out = generated_ius.emplace_back(IR::Pointer::build(IR::Char::build()), "target_iu_filtered"); - pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope_subop = pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu)); + auto& filter_scope = reinterpret_cast(filter_scope_subop); auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, target_iu, target_iu_out, condition_type, false)); + filter_scope.attachFilterLogicDependency(filter_subop, target_iu); name = filter_subop.id(); } } diff --git a/test/suboperators/test_runtime_param.cpp b/test/suboperators/test_runtime_param.cpp index 4f542b1..62d4342 100644 --- a/test/suboperators/test_runtime_param.cpp +++ b/test/suboperators/test_runtime_param.cpp @@ -10,12 +10,12 @@ namespace { // Test the runtime parameter code generation with a runtime param that gets loaded at runtime. TEST(test_runtime_param, runtime_expression_no_val) { Pipeline pipe; - CompilationContext context("runtime_param_test", pipe); IU source(IR::UnsignedInt::build(4)); IU provided(IR::UnsignedInt::build(4)); auto l_expr = RuntimeExpressionSubop::build(nullptr, {&provided}, {&source}, ExpressionOp::ComputeNode::Type::Add, IR::UnsignedInt::build(4)); pipe.attachSuboperator(l_expr); - pipe.repipeAll(0, 1); + auto repiped = pipe.repipeAll(0, 1); + CompilationContext context("runtime_param_test", *repiped); EXPECT_NO_THROW(context.compile()); } @@ -24,16 +24,16 @@ TEST(test_runtime_param, runtime_expression_val) { RuntimeExpressionParams params; params.dataSet(IR::UI<4>::build(4)); Pipeline pipe; - CompilationContext context("runtime_param_test", pipe); IU source(IR::UnsignedInt::build(4)); IU provided(IR::UnsignedInt::build(4)); auto l_expr = RuntimeExpressionSubop::build(nullptr, {&provided}, {&source}, ExpressionOp::ComputeNode::Type::Add, IR::UnsignedInt::build(4)); static_cast(*l_expr).attachRuntimeParams(std::move(params)); pipe.attachSuboperator(l_expr); - pipe.repipeAll(0, 1); + auto repiped = pipe.repipeAll(0, 1); + CompilationContext context("runtime_param_test", *repiped); EXPECT_NO_THROW(context.compile()); } } -} \ No newline at end of file +} diff --git a/tools/inkfuse_bench.cpp b/tools/inkfuse_bench.cpp index 396a5f4..662b00b 100644 --- a/tools/inkfuse_bench.cpp +++ b/tools/inkfuse_bench.cpp @@ -52,7 +52,6 @@ const std::vector> backe {"fused", PipelineExecutor::ExecutionMode::Fused}, {"rof", PipelineExecutor::ExecutionMode::ROF}, }; - } int main(int argc, char* argv[]) {