Skip to content

Commit

Permalink
Fix Filter Code Generation
Browse files Browse the repository at this point in the history
Our ROF implementation showed some correctness issues in how we
implement code generation for filters.

A filter looks as follows:
```
    /- IU Prov 1 ----------------> Filter 1 ---> Sink 1
Src                            /
    \- IU Prov 2 -> FilterScope ---> Filter 2 -> Sink 2
               \-----------------/
```

The IUs going from FilterScope to the filters are void-typed pseudo IUs.
The problem was that we could run into cases where FilterScope would
generate its nested `IF` before both IU providers were opened.

In those cases, we would generate the IU provider iterator only within
the nested filter scope, causing it to "lag" behind the other iterator,
producing incorrect results.

The core problem is that we do not model a code generation dependency
between `IU Prov 1 -> FilterScope`.

This commit ensures that all input IU providers generate their code
before the if is opened. Now, `Filter 1` and `Filter 2` only request
code generation of `FilterScope`, and `FilterScope` requests generating
both IU providers.

This is done in somewhat hacky way, if we rebuilt the system today we
should not model code generation dependencies through void-typed pseudo
IUs. We should instead probably model IU and codegen dependencies
separately.
  • Loading branch information
wagjamin committed Nov 3, 2023
1 parent 33e1b93 commit 3264623
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 60 deletions.
7 changes: 5 additions & 2 deletions src/algebra/Filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ void Filter::decay(PipelineDAG& dag) const
children[0]->decay(dag);
auto& pipe = dag.getCurrentPipeline();
// Attach the control flow sub-operator.
auto scope = ColumnFilterScope::build(this, filter_iu, pseudo_iu);
pipe.attachSuboperator(std::move(scope));
auto& scope_supop = pipe.attachSuboperator(ColumnFilterScope::build(this, filter_iu, pseudo_iu));
auto& scope = reinterpret_cast<ColumnFilterScope&>(scope_supop);
// Attach the logic operators performing the copies.
for (size_t k = 0; k < redefined.size(); ++k) {
const IU& old_iu = *to_redefine[k];
const IU& new_iu = redefined[k];
auto logic = ColumnFilterLogic::build(this, pseudo_iu, old_iu, new_iu);
// Attach filter dependency to the scope suboperator. This makes sure the source IUs
// are generated before the filter.
scope.attachFilterLogicDependency(*logic, old_iu);
pipe.attachSuboperator(std::move(logic));
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/algebra/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,17 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const {
}

// 2.3 Filter on probe matches.
probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu));
auto& filter_scope_subop = probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu));
auto& filter_scope = reinterpret_cast<ColumnFilterScope&>(filter_scope_subop);
// The filter on the build site filters "itself". This has some repercussions on the repiping
// behaviour of the suboperator and needs to be passed explicitly.
probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true));
auto& filter_1 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true));
filter_scope.attachFilterLogicDependency(filter_1, *lookup_right);
if (type != JoinType::LeftSemi) {
// If we need to produce columns on the probe side, we also have to filter the probe result.
// Note: the filtered ByteArray from the probe side becomes a Char* after filtering.
probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type));
auto& filter_2 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type));
filter_scope.attachFilterLogicDependency(filter_2, *scratch_pad_right);
}

// 2.4 Unpack everything.
Expand Down
58 changes: 34 additions & 24 deletions src/algebra/suboperators/ColumnFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,35 @@

namespace inkfuse {

SuboperatorArc ColumnFilterScope::build(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo)
{
SuboperatorArc ColumnFilterScope::build(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo) {
return SuboperatorArc{new ColumnFilterScope(source_, filter_iu_, pseudo)};
}

ColumnFilterScope::ColumnFilterScope(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo)
: TemplatedSuboperator<EmptyState>(source_, std::vector<const IU*>{&pseudo}, std::vector<const IU*>{&filter_iu_})
{
: TemplatedSuboperator<EmptyState>(source_, std::vector<const IU*>{&pseudo}, std::vector<const IU*>{&filter_iu_}) {
}

void ColumnFilterScope::consumeAllChildren(CompilationContext& context)
{
void ColumnFilterScope::open(CompilationContext& context) {
if (filter_logic_dependencies.empty()) {
throw std::runtime_error("ColumnFilterScope must be provided with all filter dependencies.");
}
// We first request all dependencies that the following `ColumnFilterLogics` will have.
// This ensures that their iterators are generated in the outer scope.
for (auto& [op, iu] : filter_logic_dependencies) {
context.requestIU(*op, *iu);
}
// No we request our actual filter IU. Once that code was generated we enter `consumeAllChildren`.
for (const IU* in : source_ius) {
context.requestIU(*this, *in);
}
}

void ColumnFilterScope::attachFilterLogicDependency(Suboperator& subop, const IU& iu) {
assert(dynamic_cast<ColumnFilterLogic*>(&subop));
filter_logic_dependencies.push_back({&subop, &iu});
}

void ColumnFilterScope::consumeAllChildren(CompilationContext& context) {
auto& builder = context.getFctBuilder();
const auto& program = context.getProgram();

Expand All @@ -33,28 +50,24 @@ void ColumnFilterScope::consumeAllChildren(CompilationContext& context)
}
}

void ColumnFilterScope::close(CompilationContext& context)
{
void ColumnFilterScope::close(CompilationContext& context) {
// We can now close the sub-operator, this will terminate the if statement
// and reinstall the original block.
opt_if->End();
opt_if.reset();
context.notifyOpClosed(*this);
}

std::string ColumnFilterScope::id() const
{
std::string ColumnFilterScope::id() const {
return "ColumnFilterScope";
}

SuboperatorArc ColumnFilterLogic::build(const RelAlgOp* source_, const IU& pseudo, const IU& incoming_, const IU& redefined, IR::TypeArc filter_type_, bool filters_itself_)
{
SuboperatorArc ColumnFilterLogic::build(const RelAlgOp* source_, const IU& pseudo, const IU& incoming_, const IU& redefined, IR::TypeArc filter_type_, bool filters_itself_) {
return SuboperatorArc{new ColumnFilterLogic(source_, pseudo, incoming_, redefined, std::move(filter_type_), filters_itself_)};
}

ColumnFilterLogic::ColumnFilterLogic(const RelAlgOp* source_, const IU& pseudo, const IU& incoming, const IU& redefined, IR::TypeArc filter_type_, bool filters_itself_)
: TemplatedSuboperator<EmptyState>(source_, std::vector<const IU*>{&redefined}, std::vector<const IU*>{&pseudo, &incoming}), filter_type(std::move(filter_type_)), filters_itself(filters_itself_)
{
: TemplatedSuboperator<EmptyState>(source_, std::vector<const IU*>{&redefined}, std::vector<const IU*>{&pseudo, &incoming}), filter_type(std::move(filter_type_)), filters_itself(filters_itself_) {
// When filtering a ByteArray something subtle happens:
// The result column becomes a char*. This way we don't have to copy the entire byte array, but rather
// just pointers. An alternative implementation would be to have variable size sinks and to do
Expand All @@ -66,18 +79,16 @@ ColumnFilterLogic::ColumnFilterLogic(const RelAlgOp* source_, const IU& pseudo,
}
}

void ColumnFilterLogic::open(CompilationContext& context)
{
// First request the incoming IU that gets filtered. This is extremely important as their iterator
void ColumnFilterLogic::open(CompilationContext& context) {
// We do not request incoming IU that gets filtered. This is extremely important as their iterator
// should be generated outside of the `if`. If we don't descend the DAG this way, then we might generate
// the variable-size state iterator inside the nested control flow.
context.requestIU(*this, *source_ius[1]);
// Only now request generation of the `if`. We know that the variable-sized iterators are outside the `if`.

// Only request generation of the `if`. The ColumnFilterScope will generate the input IU that will be filtered.
context.requestIU(*this, *source_ius[0]);
}

void ColumnFilterLogic::consumeAllChildren(CompilationContext& context)
{
void ColumnFilterLogic::consumeAllChildren(CompilationContext& context) {
auto& builder = context.getFctBuilder();

// Get the definition of the filter IU - this is the second source IU.
Expand All @@ -97,13 +108,12 @@ void ColumnFilterLogic::consumeAllChildren(CompilationContext& context)
context.notifyIUsReady(*this);
}

std::string ColumnFilterLogic::id() const
{
std::string ColumnFilterLogic::id() const {
if (filters_itself) {
return "ColumnSelfFilterLogic_" + filter_type->id() + "_" + source_ius[1]->type->id();
} else {
return "ColumnFilterLogic_" + filter_type->id() + "_" + source_ius[1]->type->id();
}
}

}
}
13 changes: 9 additions & 4 deletions src/algebra/suboperators/ColumnFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ namespace inkfuse {

/// Scoping operator building the if statement needed in a filter.
struct ColumnFilterScope : public TemplatedSuboperator<EmptyState> {

/// Set up a ColumnFilterScope that consumes the filter IU and defines a new pseudo IU.
static SuboperatorArc build(const RelAlgOp* source_, const IU& filter_iu_, const IU& pseudo);

Expand All @@ -30,6 +29,11 @@ struct ColumnFilterScope : public TemplatedSuboperator<EmptyState> {
void consumeAllChildren(CompilationContext& context) override;
/// On close we can terminate the if block we have created for downstream consumers.
void close(CompilationContext& context) override;
/// On open we request the filter iu, but also all dependent IUs from ColumnFilterLogic.
void open(CompilationContext& context) override;

/// Attach a dependency of a future ColumnFilterLogic.
void attachFilterLogicDependency(Suboperator& subop, const IU& iu);

std::string id() const override;

Expand All @@ -38,11 +42,14 @@ struct ColumnFilterScope : public TemplatedSuboperator<EmptyState> {

/// In-flight if statement being generated.
std::optional<IR::If> opt_if;
/// The `ColumnFilterScope` requests the input IUs for the `ColumnFilterLogic`.
/// This is done to make sure that their iterators are generated outside of the
/// nested scope created by the `if`.
std::vector<std::pair<Suboperator*, const IU*>> filter_logic_dependencies;
};

/// Logic operator redefining the IU as a copy of the old one.
struct ColumnFilterLogic : public TemplatedSuboperator<EmptyState> {

/// Set up a ColumnFilterLogic that consumes the ColumnFilterScope pseudo IU and redefines the incoming one.
static SuboperatorArc build(const RelAlgOp* source_, const IU& pseudo, const IU& incoming, const IU& redefined, IR::TypeArc filter_type_ = IR::Bool::build(), bool filters_itself = false);

Expand All @@ -65,9 +72,7 @@ struct ColumnFilterLogic : public TemplatedSuboperator<EmptyState> {
/// Does this filter filter itself? I.e. the filter IU is the same one as the
/// incoming one that gets redefined?
bool filters_itself;

};

}

#endif //INKFUSE_COLUMNCOMPACTOR_H
2 changes: 1 addition & 1 deletion src/codegen/backend_c/BackendC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace inkfuse {

namespace {

static constexpr bool debug_mode = true;
static constexpr bool debug_mode = false;

/// Generate the path for the c program.
std::string path(std::string_view program_name) {
Expand Down
3 changes: 0 additions & 3 deletions src/exec/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ PipelineExecutor::PipelineStats PipelineExecutor::runPipeline() {
// Store how long we were stalled waiting for compilation to finish.
result.codegen_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(compilation_done_ts - start_execution_ts).count();

std::cout << "Found " << compilation_jobs.size() << " JIT fragments \n";

for (auto& compiled_fragment : compile_state) {
assert(compiled_fragment->compiled);
compiled_fragment->compiled->setUpState();
Expand Down Expand Up @@ -294,7 +292,6 @@ void PipelineExecutor::setUpInterpreted() {
std::vector<std::thread> PipelineExecutor::setUpFusedAsync(ExecutionMode mode) {
// Create a compile state for the respective JIT interval.
auto attach_compile_state = [&](size_t start, size_t end) {
std::cout << "Attaching compile state " << start << ", " << end << std::endl;
auto repiped = pipe.repipeRequired(start, end);
std::pair<size_t, size_t> jit_interval{start, end};
// Create a fragment name that will be unique across different ROF intervals in the pipeline.
Expand Down
6 changes: 3 additions & 3 deletions src/exec/runners/InterpretedRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ InterpretedRunner::~InterpretedRunner() {

Suboperator::PickMorselResult InterpretedRunner::pickMorsel(size_t thread_id) {
if (mode == ExecutionMode::ZeroCopyScan && !pick_from_source_table) {
// If this is a suboperator interpreting a table scan, but not the first one,
// then we should not pick from the table scan. We need to bind to the original
// morsel of the first interpreter instead.
// If this is a suboperator interpreting a table scan, but not the first one,
// then we should not pick from the table scan. We need to bind to the original
// morsel of the first interpreter instead.
return Suboperator::PickedMorsel{
.morsel_size = 1,
};
Expand Down
34 changes: 20 additions & 14 deletions src/interpreter/ColumnFilterFragmentizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,48 @@ const std::vector<IR::TypeArc> condition_types = {IR::Bool::build(), IR::Pointer

ColumnFilterFragmentizer::ColumnFilterFragmentizer() {
// Not self filtering.
for (auto& type: types) {
for (auto& condition_type: condition_types) {
for (auto& type : types) {
for (auto& condition_type : condition_types) {
auto& [name, pipe] = pipes.emplace_back();
const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter");
// Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink.
const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), "");
const auto& target_iu = generated_ius.emplace_back(type, "target_iu");
const auto& target_iu_out = generated_ius.emplace_back(type, "target_iu_filtered");
pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu));
auto& filter_scope_subop = pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu));
auto& filter_scope = reinterpret_cast<ColumnFilterScope&>(filter_scope_subop);
auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, target_iu, target_iu_out, condition_type, false));
filter_scope.attachFilterLogicDependency(filter_subop, target_iu);
name = filter_subop.id();
}
}
// Self filtering.
for (auto& condition_type: condition_types) {
auto& [name, pipe] = pipes.emplace_back();
const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter");
// Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink.
const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), "");
const auto& target_iu_out = generated_ius.emplace_back(condition_type, "target_iu_filtered");
pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu));
auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, filter_iu, target_iu_out, condition_type, true));
name = filter_subop.id();
for (auto& condition_type : condition_types) {
auto& [name, pipe] = pipes.emplace_back();
const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter");
// Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink.
const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), "");
const auto& target_iu_out = generated_ius.emplace_back(condition_type, "target_iu_filtered");
auto& filter_scope_subop = pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu));
auto& filter_scope = reinterpret_cast<ColumnFilterScope&>(filter_scope_subop);
auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, filter_iu, target_iu_out, condition_type, true));
filter_scope.attachFilterLogicDependency(filter_subop, filter_iu);
name = filter_subop.id();
}
// Filter on a ByteArray - This one is special as it returns a Char*.
// We only copy over the pointers and not the entire array.
{
for (auto& condition_type: condition_types) {
for (auto& condition_type : condition_types) {
auto& [name, pipe] = pipes.emplace_back();
const auto& filter_iu = generated_ius.emplace_back(condition_type, "filter");
// Note that the void IU will not be produced in repipeAll, i.e. the fragment does not have a FuseChunkSink.
const auto& pseudo_iu = generated_ius.emplace_back(IR::Void::build(), "");
const auto& target_iu = generated_ius.emplace_back(IR::ByteArray::build(0), "target_iu");
const auto& target_iu_out = generated_ius.emplace_back(IR::Pointer::build(IR::Char::build()), "target_iu_filtered");
pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu));
auto& filter_scope_subop = pipe.attachSuboperator(ColumnFilterScope::build(nullptr, filter_iu, pseudo_iu));
auto& filter_scope = reinterpret_cast<ColumnFilterScope&>(filter_scope_subop);
auto& filter_subop = pipe.attachSuboperator(ColumnFilterLogic::build(nullptr, pseudo_iu, target_iu, target_iu_out, condition_type, false));
filter_scope.attachFilterLogicDependency(filter_subop, target_iu);
name = filter_subop.id();
}
}
Expand Down
10 changes: 5 additions & 5 deletions test/suboperators/test_runtime_param.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ namespace {
// Test the runtime parameter code generation with a runtime param that gets loaded at runtime.
TEST(test_runtime_param, runtime_expression_no_val) {
Pipeline pipe;
CompilationContext context("runtime_param_test", pipe);
IU source(IR::UnsignedInt::build(4));
IU provided(IR::UnsignedInt::build(4));
auto l_expr = RuntimeExpressionSubop::build(nullptr, {&provided}, {&source}, ExpressionOp::ComputeNode::Type::Add, IR::UnsignedInt::build(4));
pipe.attachSuboperator(l_expr);
pipe.repipeAll(0, 1);
auto repiped = pipe.repipeAll(0, 1);
CompilationContext context("runtime_param_test", *repiped);
EXPECT_NO_THROW(context.compile());
}

Expand All @@ -24,16 +24,16 @@ TEST(test_runtime_param, runtime_expression_val) {
RuntimeExpressionParams params;
params.dataSet(IR::UI<4>::build(4));
Pipeline pipe;
CompilationContext context("runtime_param_test", pipe);
IU source(IR::UnsignedInt::build(4));
IU provided(IR::UnsignedInt::build(4));
auto l_expr = RuntimeExpressionSubop::build(nullptr, {&provided}, {&source}, ExpressionOp::ComputeNode::Type::Add, IR::UnsignedInt::build(4));
static_cast<RuntimeExpressionSubop&>(*l_expr).attachRuntimeParams(std::move(params));
pipe.attachSuboperator(l_expr);
pipe.repipeAll(0, 1);
auto repiped = pipe.repipeAll(0, 1);
CompilationContext context("runtime_param_test", *repiped);
EXPECT_NO_THROW(context.compile());
}

}

}
}
1 change: 0 additions & 1 deletion tools/inkfuse_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ const std::vector<std::pair<std::string, PipelineExecutor::ExecutionMode>> backe
{"fused", PipelineExecutor::ExecutionMode::Fused},
{"rof", PipelineExecutor::ExecutionMode::ROF},
};

}

int main(int argc, char* argv[]) {
Expand Down

0 comments on commit 3264623

Please sign in to comment.