Skip to content

Commit

Permalink
topN executor refactored ♻️
Browse files Browse the repository at this point in the history
  • Loading branch information
AyoubKaz07 committed Dec 10, 2023
1 parent 1334cec commit f06550f
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 64 deletions.
45 changes: 35 additions & 10 deletions src/execution/topn_executor.cpp
Original file line number Diff line number Diff line change
@@ -1,33 +1,58 @@
#include "execution/executors/topn_executor.h"
#include <queue>

namespace bustub {

TopNExecutor::TopNExecutor(ExecutorContext *exec_ctx, const TopNPlanNode *plan,
std::unique_ptr<AbstractExecutor> &&child_executor)
: AbstractExecutor(exec_ctx),
plan_(plan),
child_executor_(std::move(child_executor)),
comparator_(this), // Pass this TopNExecutor instance to the TupleComparator constructor
topNHeap_(comparator_) // Pass the TupleComparator instance to the priority_queue constructor
{}
: AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {}

void TopNExecutor::Init() {
// Comparator for the top-N heap
auto comparator = [this](const Tuple &t1, const Tuple &t2) -> bool {
for (const auto &[order_by_type, expr] : plan_->GetOrderBy()) {
if (order_by_type == OrderByType::ASC || order_by_type == OrderByType::DEFAULT) {
if (expr->Evaluate(&t1, child_executor_->GetOutputSchema())
.CompareLessThan(expr->Evaluate(&t2, child_executor_->GetOutputSchema())) == CmpBool::CmpTrue) {
return true;
} else if (expr->Evaluate(&t1, child_executor_->GetOutputSchema())
.CompareGreaterThan(expr->Evaluate(&t2, child_executor_->GetOutputSchema())) ==
CmpBool::CmpTrue) {
return false;
}
}
if (order_by_type == OrderByType::DESC) {
if (expr->Evaluate(&t1, child_executor_->GetOutputSchema())
.CompareLessThan(expr->Evaluate(&t2, child_executor_->GetOutputSchema())) == CmpBool::CmpTrue) {
return false;
} else if (expr->Evaluate(&t1, child_executor_->GetOutputSchema())
.CompareGreaterThan(expr->Evaluate(&t2, child_executor_->GetOutputSchema())) ==
CmpBool::CmpTrue) {
return true;
}
}
}
return false;
};

std::priority_queue<Tuple, std::vector<Tuple>, decltype(comparator)> topNHeap_(comparator);

Tuple tuple;
RID rid;
child_executor_->Init();
// Process each tuple from the child executor
while (child_executor_->Next(&tuple, &rid)) {
// Add the tuple to the top-N heap
this->topNHeap_.push(tuple);
topNHeap_.push(tuple);

// Ensure the heap size doesn't exceed N
while (topNHeap_.size() > plan_->GetN()) {
this->topNHeap_.pop();
topNHeap_.pop();
}
}
while (!(this->topNHeap_.empty())) {
while (!(topNHeap_.empty())) {
sorted_tuples_.push_back(topNHeap_.top());
this->topNHeap_.pop();
topNHeap_.pop();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/execution/executors/delete_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#pragma once

#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <string>

#include "execution/executor_context.h"
#include "execution/executors/abstract_executor.h"
Expand Down
2 changes: 1 addition & 1 deletion src/include/execution/executors/insert_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#pragma once

#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <string>

#include "execution/executor_context.h"
#include "execution/executors/abstract_executor.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
#pragma once

#include <memory>
#include <vector>
#include <utility>
#include <vector>

#include "execution/executor_context.h"
#include "execution/executors/abstract_executor.h"
Expand Down
2 changes: 1 addition & 1 deletion src/include/execution/executors/seq_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

#pragma once

#include <vector>
#include <memory>
#include <vector>

#include "execution/executor_context.h"
#include "execution/executors/abstract_executor.h"
Expand Down
50 changes: 0 additions & 50 deletions src/include/execution/executors/topn_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@

namespace bustub {

class TopNExecutor;

struct TupleComparator {
const TopNExecutor *executor_; // Reference to the parent TopNExecutor instance

explicit TupleComparator(const TopNExecutor *executor) : executor_(executor) {}

bool operator()(const Tuple &t1, const Tuple &t2) const;
};

/**
* The TopNExecutor executor executes a topn.
Expand Down Expand Up @@ -67,47 +58,6 @@ class TopNExecutor : public AbstractExecutor {
/** The child executor used to retrieve tuples from */
std::unique_ptr<AbstractExecutor> child_executor_;

// Custom comparator function for the priority queue
struct TupleComparator {
const TopNExecutor *executor_; // Reference to the parent TopNExecutor instance

explicit TupleComparator(const TopNExecutor *executor) : executor_(executor) {}

bool operator()(const Tuple &t1, const Tuple &t2) const {
for (const auto &[order_by_type, expr] : executor_->plan_->GetOrderBy()) {
if (order_by_type == OrderByType::ASC || order_by_type == OrderByType::DEFAULT) {
if (expr->Evaluate(&t1, executor_->child_executor_->GetOutputSchema())
.CompareLessThan(expr->Evaluate(&t2, executor_->child_executor_->GetOutputSchema())) ==
CmpBool::CmpTrue) {
return true;
} else if (expr->Evaluate(&t1, executor_->child_executor_->GetOutputSchema())
.CompareGreaterThan(expr->Evaluate(&t2, executor_->child_executor_->GetOutputSchema())) ==
CmpBool::CmpTrue) {
return false;
}
}
if (order_by_type == OrderByType::DESC) {
if (expr->Evaluate(&t1, executor_->child_executor_->GetOutputSchema())
.CompareLessThan(expr->Evaluate(&t2, executor_->child_executor_->GetOutputSchema())) ==
CmpBool::CmpTrue) {
return false;
} else if (expr->Evaluate(&t1, executor_->child_executor_->GetOutputSchema())
.CompareGreaterThan(expr->Evaluate(&t2, executor_->child_executor_->GetOutputSchema())) ==
CmpBool::CmpTrue) {
return true;
}
}
}
return false;
}
};

/** Tuple comparator */
TupleComparator comparator_;

/** The topn heap */
std::priority_queue<Tuple, std::vector<Tuple>, TupleComparator> topNHeap_;

/** Tuples result (stack)*/
std::vector<Tuple> sorted_tuples_;
};
Expand Down

0 comments on commit f06550f

Please sign in to comment.