Skip to content

Commit

Permalink
Implement Relaxed Operator Fusion
Browse files Browse the repository at this point in the history
This commit implements Relaxed Operator Fusion in InkFuse. We extend all
existing tests to now also test our ROF backend.

Incremental Fusion conceptually supports relaxed operator fusion. The tuple
buffers that we can install between arbitrary suboperators are similar
to ROF staging points.

This leads to a relatively small code change to integrate ROF in InkFuse.
The main modifications are all found in the `PipelineExecutor`.

At a high-level, ROF in InkFuse goes through the following steps:
1. A `Suboperator` can attach a `ROFStrategy` in its
   `OptimizationProperties`. This indicates whether the suboperator
   prefers compilation or vectorization.
2. The optimization properties become easy to use through an
   `ROFScopeGuard` in `Pipeline.h`. When we decay an operator into
   suboperators, this `ROFScopeGuard` can simply be set up during
   suboperator creation and indicates that the generated suboperators
   should all be vectorized.
3. The `PipelineExecutor` now splits the topological order of the
   suboperators into maximum connected components that have
   vectorized or JIT preference. The JIT components are then compiled
   ahead of time.
4. The ROF backend then iterates through the suboperator topological
   sort. Any interpreted comment uses the pre-compiled primitives. Any
   compiled component uses the JIT code.

In a next step, we will extend our benchmarking binaries with ROF
support and start performance measuring the ROF backend.
  • Loading branch information
wagjamin committed Nov 3, 2023
1 parent 5e38c11 commit 33e1b93
Show file tree
Hide file tree
Showing 39 changed files with 615 additions and 260 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -rdynamic -stdlib=libc++")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -rdynamic -g -O0 -fsanitize=address")
# set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -rdynamic -g -O0 -fsanitize=address")
# Generate DWARF 4 in debug to work on older GDB versions
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -rdynamic -g -gdwarf-4 -O0 -fsanitize=address")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}")

# ---------------------------------------------------------------------------
Expand Down
23 changes: 20 additions & 3 deletions src/algebra/CompilationContext.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "algebra/CompilationContext.h"
#include "algebra/Pipeline.h"
#include "algebra/suboperators/Suboperator.h"
#include "algebra/suboperators/row_layout/KeyPackerSubop.h"
#include <iostream>
#include <sstream>

Expand All @@ -18,10 +19,22 @@ void CompilationContext::compile() {
// Create the builder.
builder.emplace(*program, fct_name);
// Collect all sinks.
std::set<Suboperator*> sinks;
std::vector<Suboperator*> sinks;
std::for_each(pipeline.suboperators.begin(), pipeline.suboperators.end(), [&](const SuboperatorArc& op) {
if (!op->isSink() && !pipeline.graph.outgoing_edges.contains(op.get())) {
// If there is no outgoing edge we still need to produce code for the suboperator.
// This can e.g. happen during ROF if we cut at an outgoing strong edge.
// The most common example is cutting right after key packing.
// We need to open these operators before the actual sinks as they actually
// open loop scopes that the sinks may depend on.
// Change the order for TPC-H Q3 and look at pipeline 1 to understand the problem.
sinks.push_back(op.get());
producing_no_request.insert(&*op);
}
});
std::for_each(pipeline.suboperators.begin(), pipeline.suboperators.end(), [&](const SuboperatorArc& op) {
if (op->isSink()) {
sinks.insert(op.get());
sinks.push_back(op.get());
}
});
// Open all sinks.
Expand Down Expand Up @@ -50,6 +63,11 @@ void CompilationContext::notifyOpClosed(Suboperator& op) {
void CompilationContext::notifyIUsReady(Suboperator& op) {
// Fetch the sub-operator which requested the given IU.
computed.emplace(&op);
if (producing_no_request.count(&op)) {
// There is no suboperator in the current pipeline that needs the
// output IUs.
return;
}
auto [requestor, iu] = requests[&op];
// Remove the now serviced request from the map again.
requests.erase(&op);
Expand Down Expand Up @@ -170,5 +188,4 @@ IR::FunctionBuilder CompilationContext::createFctBuilder(IR::IRBuilder& program,
auto return_type = IR::UnsignedInt::build(1);
return program.createFunctionBuilder(std::make_shared<IR::Function>(std::move(fct_name), std::move(args), std::move(constness), std::move(return_type)));
}

}
3 changes: 3 additions & 0 deletions src/algebra/CompilationContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ struct CompilationContext {
std::unordered_map<Suboperator*, NodeProperties> properties;
/// The open IU requests which need to be mapped. (from -> to)
std::unordered_map<Suboperator*, std::pair<Suboperator*, const IU*>> requests;
/// Suboperators that proruce IUs where there exists no requestor.
/// These come from suoperators that have only outgoing pseudo IUs.
std::unordered_set<const Suboperator*> producing_no_request;
/// IU declarations.
std::map<const IU*, const IR::Stmt*> iu_declarations;
};
Expand Down
13 changes: 6 additions & 7 deletions src/algebra/ExpressionOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ IR::TypeArc ExpressionOp::derive(ComputeNode::Type code, const std::vector<Node*

IR::TypeArc ExpressionOp::derive(ComputeNode::Type code, const std::vector<IR::TypeArc>& types) {
// Operations that return a boolean as result.
static std::unordered_set<ComputeNode::Type> bool_returning {
ComputeNode::Type::Eq, ComputeNode::Type::Neq,
ComputeNode::Type::Less, ComputeNode::Type::LessEqual,
static std::unordered_set<ComputeNode::Type> bool_returning{
ComputeNode::Type::Eq, ComputeNode::Type::Neq,
ComputeNode::Type::Less, ComputeNode::Type::LessEqual,
ComputeNode::Type::Greater, ComputeNode::Type::GreaterEqual,
ComputeNode::Type::StrEquals, ComputeNode::Type::And,
ComputeNode::Type::Or
};
ComputeNode::Type::Or};
if (code == ComputeNode::Type::Hash) {
return IR::UnsignedInt::build(8);
}
Expand All @@ -43,8 +42,7 @@ ExpressionOp::ExpressionOp(std::vector<std::unique_ptr<RelAlgOp>> children_, std
}
}

std::unique_ptr<ExpressionOp> ExpressionOp::build(std::vector<std::unique_ptr<RelAlgOp>> children_, std::string op_name_, std::vector<Node*> out_, std::vector<NodePtr> nodes_)
{
std::unique_ptr<ExpressionOp> ExpressionOp::build(std::vector<std::unique_ptr<RelAlgOp>> children_, std::string op_name_, std::vector<Node*> out_, std::vector<NodePtr> nodes_) {
return std::make_unique<ExpressionOp>(std::move(children_), std::move(op_name_), std::move(out_), std::move(nodes_));
}

Expand All @@ -71,6 +69,7 @@ void ExpressionOp::decay(PipelineDAG& dag) const {
for (const auto& child : children) {
child->decay(dag);
}

// The set stores which expression suboperators were created already.
std::unordered_map<Node*, const IU*> built;
for (const auto root : out) {
Expand Down
116 changes: 62 additions & 54 deletions src/algebra/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,71 +259,79 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const {
}

// 2.2 Probe.
std::vector<const IU*> pseudo;
for (const auto& pseudo_iu : right_pseudo_ius) {
pseudo.push_back(&pseudo_iu);
}
{
{
// Perform the actual lookup in a fully vectorized fashion.
Pipeline::ROFScopeGuard rof_guard{probe_pipe};

// 2.2.1 Compute the hash.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htHash<AtomicHashTable<SimpleKeyComparator>>(this, *hash_right, *scratch_pad_right, std::move(pseudo), &ht_state));
std::vector<const IU*> pseudo;
for (const auto& pseudo_iu : right_pseudo_ius) {
pseudo.push_back(&pseudo_iu);
}

// 2.2.2 Prefetch the slot.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htPrefetch<AtomicHashTable<SimpleKeyComparator>>(this, &*prefetch_pseudo, *hash_right, &ht_state));
// 2.2.1 Compute the hash.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htHash<AtomicHashTable<SimpleKeyComparator>>(this, *hash_right, *scratch_pad_right, std::move(pseudo), &ht_state));

// 2.2.3 Perfom the lookup.
if (type == JoinType::LeftSemi) {
// Lookup on a slot disables the slot, giving semi-join behaviour.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, true>(this, *lookup_right, *scratch_pad_right, *hash_right, &*prefetch_pseudo, &ht_state));
} else {
// Regular lookup that does not disable slots.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, false>(this, *lookup_right, *scratch_pad_right, *hash_right, &*prefetch_pseudo, &ht_state));
}
// 2.2.2 Prefetch the slot.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htPrefetch<AtomicHashTable<SimpleKeyComparator>>(this, &*prefetch_pseudo, *hash_right, &ht_state));

// 2.3 Filter on probe matches.
probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu));
// The filter on the build site filters "itself". This has some repercussions on the repiping
// behaviour of the suboperator and needs to be passed explicitly.
probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true));
if (type != JoinType::LeftSemi) {
// If we need to produce columns on the probe side, we also have to filter the probe result.
// Note: the filtered ByteArray from the probe side becomes a Char* after filtering.
probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type));
}
// 2.2.3 Perfom the lookup.
if (type == JoinType::LeftSemi) {
// Lookup on a slot disables the slot, giving semi-join behaviour.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, true>(this, *lookup_right, *scratch_pad_right, *hash_right, &*prefetch_pseudo, &ht_state));
} else {
// Regular lookup that does not disable slots.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, false>(this, *lookup_right, *scratch_pad_right, *hash_right, &*prefetch_pseudo, &ht_state));
}
}

// 2.4 Unpack everything.
// 2.4.1 Unpack Build Side IUs
size_t build_unpack_offset = 0;
for (const auto& iu : keys_left_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(build_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
build_unpack_offset += iu.type->numBytes();
}
for (const auto& iu : payload_left_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(build_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
build_unpack_offset += iu.type->numBytes();
}
// 2.4.1 Unpack Probe Side IUs. Not needed for semi joins.
if (type != JoinType::LeftSemi) {
size_t probe_unpack_offset = 0;
for (const auto& iu : keys_right_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu));
// 2.3 Filter on probe matches.
probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu));
// The filter on the build site filters "itself". This has some repercussions on the repiping
// behaviour of the suboperator and needs to be passed explicitly.
probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true));
if (type != JoinType::LeftSemi) {
// If we need to produce columns on the probe side, we also have to filter the probe result.
// Note: the filtered ByteArray from the probe side becomes a Char* after filtering.
probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type));
}

// 2.4 Unpack everything.
// 2.4.1 Unpack Build Side IUs
size_t build_unpack_offset = 0;
for (const auto& iu : keys_left_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(probe_unpack_offset));
param.offsetSet(IR::UI<2>::build(build_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
probe_unpack_offset += iu.type->numBytes();
build_unpack_offset += iu.type->numBytes();
}
for (const auto& iu : payload_right_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu));
for (const auto& iu : payload_left_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(probe_unpack_offset));
param.offsetSet(IR::UI<2>::build(build_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
probe_unpack_offset += iu.type->numBytes();
build_unpack_offset += iu.type->numBytes();
}
// 2.4.1 Unpack Probe Side IUs. Not needed for semi joins.
if (type != JoinType::LeftSemi) {
size_t probe_unpack_offset = 0;
for (const auto& iu : keys_right_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(probe_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
probe_unpack_offset += iu.type->numBytes();
}
for (const auto& iu : payload_right_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(probe_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
probe_unpack_offset += iu.type->numBytes();
}
}
// End vectorized Block.
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/algebra/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ std::unique_ptr<Pipeline> Pipeline::repipeRequired(size_t start, size_t end) con
suboperators.begin() + end,
[&](const SuboperatorArc& subop) {
for (auto iu : subop->getIUs()) {
out_provided.insert(iu);
if (!dynamic_cast<IR::Void*>(iu->type.get())) {
// We do not add void IUs as these present pseudo IUs to properly connect the graph.
out_provided.insert(iu);
}
}
});

Expand All @@ -46,7 +49,7 @@ std::unique_ptr<Pipeline> Pipeline::repipeRequired(size_t start, size_t end) con
suboperators.end(),
[&](const SuboperatorArc& subop) {
for (auto iu : subop->getSourceIUs()) {
if (!out_provided.count(iu)) {
if (out_provided.count(iu)) {
out_required.insert(iu);
}
}
Expand Down
24 changes: 24 additions & 0 deletions src/algebra/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,30 @@ struct PipelineGraph {
/// Internally, the pipeline is a DAG of suboperators.
struct Pipeline {
public:
/// A scope guard for relaxed operator fusion. Sets the preferred strategy
/// to vectorized on construction, and ends the vectorized block on destruction.
struct ROFScopeGuard {
ROFScopeGuard(Pipeline& pipe_) : pipe(pipe_), size_at_start(pipe.getSubops().size()) {
}

~ROFScopeGuard() {
if (pipe.getSubops().size() > (size_at_start + 2)) {
// Start vectorized execution at the suboperator after the scope guard was set up.
pipe.getSubops()[size_at_start]->setROFStrategy(
Suboperator::OptimizationProperties::ROFStrategy::BeginVectorized);
// End vectorized execution at the currently last suboperator.
pipe.getSubops()[pipe.getSubops().size() - 1]->setROFStrategy(
Suboperator::OptimizationProperties::ROFStrategy::EndVectorized);
}
}

private:
/// The pipeline wrapped into the scope guard.
Pipeline& pipe;
/// How many supoperators existed on startup.
size_t size_at_start = 0;
};

/// Rebuild the pipeline for a subset of the sub-operators in the given range.
/// Inserts fake sources and fake sinks. Materializes all IUs that are produced by nodes that don't have a strong output link.
std::unique_ptr<Pipeline> repipeAll(size_t start, size_t end) const;
Expand Down
18 changes: 18 additions & 0 deletions src/algebra/suboperators/Suboperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "algebra/IU.h"
#include "exec/ExecutionContext.h"
#include <cstddef>
#include <memory>
#include <optional>
#include <set>
Expand Down Expand Up @@ -128,8 +129,25 @@ struct Suboperator {
/// optimized way. We can go to smaller chunk sizes that will have better cache
/// locality. This matters as we split prefetching and lookups into two phases.
std::optional<size_t> rt_chunk_size_prefeference = std::nullopt;

/// Strategy for applying relaxed operator fusion. The performance of the final
/// plan depends on where we insert staging points. The ROFStrategy enum
/// can be used to implement staging point heuristics.
enum class ROFStrategy {
/// Retain the current strategy. No preference for the suboperator.
Default,
/// Begin interpreting suboperators in a vectorized fashion at this point.
BeginVectorized,
/// End interpreting suboperators in a vectorized fashion at this point.
EndVectorized,
};
/// ROF Strategy for this suboperator.
ROFStrategy rof_strategy = ROFStrategy::Default;
};
const OptimizationProperties& getOptimizationProperties() const { return optimization_properties; };
void setROFStrategy(OptimizationProperties::ROFStrategy strategy) {
optimization_properties.rof_strategy = strategy;
};

protected:
/// The operator which decayed into this Suboperator.
Expand Down
Loading

0 comments on commit 33e1b93

Please sign in to comment.