Skip to content

Commit

Permalink
Add TPC-H Q5
Browse files Browse the repository at this point in the history
Add the physical plan for TPC-H Q5. Add Q5 to the different
test/runner/benchmark binaries.
  • Loading branch information
wagjamin committed Nov 5, 2023
1 parent 795e89d commit d2e32cf
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 5 deletions.
341 changes: 340 additions & 1 deletion src/common/TPCH.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ const auto t_date = IR::Date::build();

size_t getScanIndex(std::string_view name, const std::vector<std::string>& cols) {
auto elem = std::find(cols.begin(), cols.end(), name);
return std::distance(cols.begin(), elem);
auto distance = std::distance(cols.begin(), elem);
if (distance == cols.size()) {
throw std::runtime_error("Column " + std::string{name} + " does not exist in scan.");
}
return distance;
}

StoredRelationPtr tableLineitem() {
Expand Down Expand Up @@ -94,6 +98,34 @@ StoredRelationPtr tablePart() {
return rel;
}

StoredRelationPtr tableSupplier() {
auto rel = std::make_unique<StoredRelation>();
rel->attachPODColumn("s_suppkey", t_i4);
rel->attachStringColumn("s_name");
rel->attachStringColumn("s_address");
rel->attachPODColumn("s_nationkey", t_i4);
rel->attachStringColumn("s_phone");
rel->attachPODColumn("s_acctbal", t_f8);
rel->attachStringColumn("s_comment");
return rel;
}

StoredRelationPtr tableNation() {
auto rel = std::make_unique<StoredRelation>();
rel->attachPODColumn("n_nationkey", t_i4);
rel->attachStringColumn("n_name");
rel->attachPODColumn("n_regionkey", t_i4);
rel->attachStringColumn("n_comment");
return rel;
}

StoredRelationPtr tableRegion() {
auto rel = std::make_unique<StoredRelation>();
rel->attachPODColumn("r_regionkey", t_i4);
rel->attachStringColumn("r_name");
rel->attachStringColumn("r_comment");
return rel;
}
}

Schema getTPCHSchema() {
Expand All @@ -102,6 +134,9 @@ Schema getTPCHSchema() {
schema.emplace("orders", tableOrders());
schema.emplace("customer", tableCustomer());
schema.emplace("part", tablePart());
schema.emplace("supplier", tableSupplier());
schema.emplace("nation", tableNation());
schema.emplace("region", tableRegion());
return schema;
}

Expand Down Expand Up @@ -599,6 +634,310 @@ std::unique_ptr<Print> q4(const Schema& schema) {
std::move(out_ius), std::move(colnames), "print");
}

std::unique_ptr<Print> q5(const Schema& schema) {
// 1.1 Scan from regions.
auto& region_rel = schema.at("region");
std::vector<std::string> region_cols{
"r_regionkey",
"r_name",
};
auto region_scan = TableScan::build(*region_rel, region_cols, "scan_region");
auto& region_scan_ref = *region_scan;

// 1.2 Filter on r_name = 'Asia'
std::vector<ExpressionOp::NodePtr> r_nodes;
r_nodes.emplace_back(std::make_unique<IURefNode>(region_scan_ref.getOutput()[1]));
auto r_eq = r_nodes.emplace_back(
std::make_unique<ComputeNode>(
ComputeNode::Type::StrEquals,
IR::StringVal::build("ASIA"),
r_nodes[0].get()))
.get();

std::vector<RelAlgOpPtr> children_r_expr;
children_r_expr.push_back(std::move(region_scan));

auto r_expr_node = ExpressionOp::build(
std::move(children_r_expr),
"region_filter",
std::vector<Node*>{r_eq},
std::move(r_nodes));
auto& r_expr_ref = *r_expr_node;
assert(r_expr_ref.getOutput().size() == 1);

std::vector<RelAlgOpPtr> children_r_filter;
children_r_filter.push_back(std::move(r_expr_node));
std::vector<const IU*> r_redefined{
region_scan_ref.getOutput()[getScanIndex("r_regionkey", region_cols)]};
auto r_filter = Filter::build(std::move(children_r_filter), "r_filter", std::move(r_redefined), *r_expr_ref.getOutput()[0]);
auto& r_filter_ref = *r_filter;
assert(r_filter->getOutput().size() == 1);

// 2. Scan from nations.
auto& nation_rel = schema.at("nation");
std::vector<std::string> nation_cols{
"n_nationkey",
"n_regionkey",
"n_name",
};
auto nation_scan = TableScan::build(*nation_rel, nation_cols, "scan_region");
auto& nation_scan_ref = *nation_scan;

// 3. Join r_regionkey = n_regionkey, returning n_nationkey
std::vector<RelAlgOpPtr> r_n_join_children;
r_n_join_children.push_back(std::move(r_filter));
r_n_join_children.push_back(std::move(nation_scan));
auto r_n_join = Join::build(
std::move(r_n_join_children),
"r_n_join",
// Keys left (r_regionkey)
{r_filter_ref.getOutput()[0]},
// Payload left (none)
{},
// Keys right (n_regionkey)
{nation_scan_ref.getOutput()[1]},
// Right payload (n_nationkey, n_name)
{
nation_scan_ref.getOutput()[0],
nation_scan_ref.getOutput()[2],
},
JoinType::Inner,
true);
auto& r_n_join_ref = *r_n_join;
assert(r_n_join_ref.getOutput().size() == 4);

// 4. Scan from customer.
auto& customer_rel = schema.at("customer");
std::vector<std::string> customer_cols{
"c_custkey",
"c_nationkey",
};
auto customer_scan = TableScan::build(*customer_rel, customer_cols, "scan_customer");
auto& customer_scan_ref = *customer_scan;

// 5. Join customer onto the nationkeys.
std::vector<RelAlgOpPtr> n_c_join_children;
n_c_join_children.push_back(std::move(r_n_join));
n_c_join_children.push_back(std::move(customer_scan));
auto n_c_join = Join::build(
std::move(n_c_join_children),
"n_c_join",
// Keys left (n_nationkey)
{r_n_join_ref.getOutput()[2]},
// Payload left (n_name)
{r_n_join_ref.getOutput()[3]},
// Keys right (c_nationkey)
{customer_scan_ref.getOutput()[1]},
// Right payload (c_custkey)
{customer_scan_ref.getOutput()[0]},
JoinType::Inner,
true);
auto& n_c_join_ref = *n_c_join;
assert(n_c_join_ref.getOutput().size() == 3);

// 6.1 Scan from orders.
auto& orders_rel = schema.at("orders");
std::vector<std::string> orders_cols{
"o_orderkey",
"o_custkey",
"o_orderdate",
};
auto orders_scan = TableScan::build(*orders_rel, orders_cols, "scan_orders");
auto& orders_scan_ref = *orders_scan;

// 6.2 Filter
std::vector<ExpressionOp::NodePtr> o_nodes;
o_nodes.emplace_back(std::make_unique<IURefNode>(orders_scan_ref.getOutput()[2]));
// 6.2.1 o_orderdate >= '1994-01-01'
o_nodes.emplace_back(
std::make_unique<ComputeNode>(
ComputeNode::Type::LessEqual,
IR::DateVal::build(helpers::dateStrToInt("1994-01-01")),
o_nodes[0].get()));
// 6.2.2 o_orderdate < '1995-01-01'
o_nodes.emplace_back(
std::make_unique<ComputeNode>(
ComputeNode::Type::Greater,
IR::DateVal::build(helpers::dateStrToInt("1995-01-01")),
o_nodes[0].get()));
o_nodes.emplace_back(
std::make_unique<ComputeNode>(
ComputeNode::Type::And,
std::vector<Node*>{o_nodes[1].get(), o_nodes[2].get()}));

std::vector<RelAlgOpPtr> children_o_expr;
children_o_expr.push_back(std::move(orders_scan));
auto o_expr_node = ExpressionOp::build(
std::move(children_o_expr),
"orders_filter",
std::vector<Node*>{o_nodes[3].get()},
std::move(o_nodes));
auto& o_expr_ref = *o_expr_node;
assert(o_expr_ref.getOutput().size() == 1);

std::vector<RelAlgOpPtr> children_o_filter;
children_o_filter.push_back(std::move(o_expr_node));
std::vector<const IU*> o_redefined{
orders_scan_ref.getOutput()[getScanIndex("o_orderkey", orders_cols)],
orders_scan_ref.getOutput()[getScanIndex("o_custkey", orders_cols)],
};
auto o_filter = Filter::build(std::move(children_o_filter), "o_filter", std::move(o_redefined), *o_expr_ref.getOutput()[0]);
auto& o_filter_ref = *o_filter;
assert(o_filter->getOutput().size() == 2);

// 7. Join customer <-> orders
std::vector<RelAlgOpPtr> c_o_join_children;
c_o_join_children.push_back(std::move(n_c_join));
c_o_join_children.push_back(std::move(o_filter));
auto c_o_join = Join::build(
std::move(c_o_join_children),
"c_o_join",
// Keys left (c_custkey)
{n_c_join_ref.getOutput()[3]},
// Payload left (n_name, c_nationkey)
{
n_c_join_ref.getOutput()[1],
n_c_join_ref.getOutput()[2],
},
// Keys right (o_custkey)
{o_filter_ref.getOutput()[1]},
// Right payload (o_orderkey)
{o_filter_ref.getOutput()[0]},
JoinType::Inner,
true);
auto& c_o_join_ref = *c_o_join;
assert(c_o_join_ref.getOutput().size() == 5);

// 8. Scan lineitem
auto& lineitem_rel = schema.at("lineitem");
std::vector<std::string> lineitem_cols{
"l_orderkey",
"l_suppkey",
"l_extendedprice",
"l_discount",
};
auto lineitem_scan = TableScan::build(*lineitem_rel, lineitem_cols, "scan_lineitem");
auto& lineitem_scan_ref = *lineitem_scan;

// 9. Join orders <-> lineitem
std::vector<RelAlgOpPtr> o_l_join_children;
o_l_join_children.push_back(std::move(c_o_join));
o_l_join_children.push_back(std::move(lineitem_scan));
auto o_l_join = Join::build(
std::move(o_l_join_children),
"o_l_join",
// Keys left (o_orderkey)
{c_o_join_ref.getOutput()[4]},
// Payload left (n_name, c_nationkey)
{
c_o_join_ref.getOutput()[1],
c_o_join_ref.getOutput()[2],
},
// Keys right (l_orderkey)
{lineitem_scan_ref.getOutput()[0]},
// Right payload (l_suppkey, l_extendedprice, l_discount)
{
lineitem_scan_ref.getOutput()[1],
lineitem_scan_ref.getOutput()[2],
lineitem_scan_ref.getOutput()[3],
},
JoinType::Inner,
true);
auto& o_l_join_ref = *o_l_join;
assert(o_l_join_ref.getOutput().size() == 7);

// 10. Scan from Supplier.
auto& supplier_rel = schema.at("supplier");
std::vector<std::string> supplier_cols{
"s_suppkey",
"s_nationkey",
};
auto supplier_scan = TableScan::build(*supplier_rel, supplier_cols, "scan_supplier");
auto& supplier_scan_ref = *supplier_scan;

// 11. Join supplier <-> lineitem
std::vector<RelAlgOpPtr> s_l_join_children;
s_l_join_children.push_back(std::move(supplier_scan));
s_l_join_children.push_back(std::move(o_l_join));
auto s_l_join = Join::build(
std::move(s_l_join_children),
"s_l_join",
// Keys left (s_suppkey, s_nationkey)
{
supplier_scan_ref.getOutput()[0],
supplier_scan_ref.getOutput()[1],
},
// Payload left (none)
{},
// Keys right (l_suppkey, c_nationkey)
{
o_l_join_ref.getOutput()[4],
o_l_join_ref.getOutput()[2],
},
// Right payload (n_name, l_extendedprice, l_discount)
{
o_l_join_ref.getOutput()[1],
o_l_join_ref.getOutput()[5],
o_l_join_ref.getOutput()[6],
},
JoinType::Inner,
true);
auto& s_l_join_ref = *s_l_join;
assert(s_l_join_ref.getOutput().size() == 6);

// 12. Aggregate (n_name, sum(l_extendedprice * (1 - l_discount)))
// 12.1 Expression
std::vector<ExpressionOp::NodePtr> agg_expr_nodes;
agg_expr_nodes.emplace_back(std::make_unique<IURefNode>(s_l_join_ref.getOutput()[5]));
agg_expr_nodes.emplace_back(std::make_unique<IURefNode>(s_l_join_ref.getOutput()[6]));
agg_expr_nodes.emplace_back(
std::make_unique<ComputeNode>(
ComputeNode::Type::Subtract,
IR::F8::build(1.0),
agg_expr_nodes[1].get()));
agg_expr_nodes.emplace_back(
std::make_unique<ComputeNode>(
ComputeNode::Type::Multiply,
std::vector<ExpressionOp::Node*>{agg_expr_nodes[0].get(),
agg_expr_nodes[2].get()}));
auto agg_expr_root = agg_expr_nodes[3].get();
std::vector<RelAlgOpPtr> agg_expr_children;
agg_expr_children.push_back(std::move(s_l_join));
auto agg_expr = ExpressionOp::build(
std::move(agg_expr_children),
"expr_pre_agg",
{agg_expr_root},
std::move(agg_expr_nodes));
auto& agg_expr_ref = *agg_expr;

// 12.2 GROUP BY
std::vector<RelAlgOpPtr> agg_children;
agg_children.push_back(std::move(agg_expr));
// Group by n_name
std::vector<const IU*> group_by{s_l_join_ref.getOutput()[4]};
std::vector<AggregateFunctions::Description> aggregates{
{*agg_expr_ref.getOutput()[0], AggregateFunctions::Opcode::Sum}};
auto agg = Aggregation::build(
std::move(agg_children),
"agg",
std::move(group_by),
std::move(aggregates));

// Print result.
std::vector<const IU*> out_ius{
agg->getOutput()[0],
agg->getOutput()[1],
};
std::vector<std::string> colnames = {
"n_name",
"revenue",
};
std::vector<RelAlgOpPtr> print_children;
print_children.push_back(std::move(agg));
return Print::build(std::move(print_children),
std::move(out_ius), std::move(colnames));
}

std::unique_ptr<Print> q6(const Schema& schema) {
// 1. Scan from lineitem.
auto& rel = schema.at("lineitem");
Expand Down
4 changes: 3 additions & 1 deletion src/common/TPCH.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ std::unique_ptr<Print> q1(const Schema& schema);
std::unique_ptr<Print> q3(const Schema& schema);
/// Join (very small build, big probe) ~70x difference
std::unique_ptr<Print> q4(const Schema& schema);
/// Join (moderate build, big probe) up to ~100x difference
std::unique_ptr<Print> q5(const Schema& schema);
/// Selective filters
std::unique_ptr<Print> q6(const Schema& schema);
/// Join (moderate build, moderate probe) ~2x difference
Expand All @@ -35,4 +37,4 @@ std::unique_ptr<Print> l_point(const Schema& schema);

}

#endif //INKFUSE_TPCH_H
#endif //INKFUSE_TPCH_H
Loading

0 comments on commit d2e32cf

Please sign in to comment.