Skip to content

Commit

Permalink
Fix Outer Joins
Browse files Browse the repository at this point in the history
Only let one thread produce the non-matched tuples.
  • Loading branch information
wagjamin authored and Benjamin Wagner committed Nov 8, 2023
1 parent bebaaf6 commit 488c021
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
21 changes: 18 additions & 3 deletions src/algebra/suboperators/sources/HashTableSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ template <class HashTable>
void HashTableSource<HashTable>::setUpStateImpl(const ExecutionContext& context) {
assert(deferred_state);

for (size_t k = 0; k < context.getNumThreads(); ++k) {
auto& state = (*states)[k];
void* ht_ptr = deferred_state->access(k);
if constexpr (std::is_same_v<HashTable, AtomicHashTable<SimpleKeyComparator>>) {
// Only the first thread produces the null marked tuples.
auto& state = (*states)[0];
void* ht_ptr = deferred_state->access(0);
state.hash_table = ht_ptr;
assert(ht_ptr);
HashTable* hash_table = reinterpret_cast<HashTable*>(ht_ptr);
Expand All @@ -170,6 +171,20 @@ void HashTableSource<HashTable>::setUpStateImpl(const ExecutionContext& context)
// Set the end iterator to the same value. This way the first pickMorsel will work.
state.it_ptr_end = state.it_ptr_start;
state.it_idx_end = state.it_idx_start;
} else {
// All threads pin themselves to their hash table.
for (size_t k = 0; k < context.getNumThreads(); ++k) {
auto& state = (*states)[k];
void* ht_ptr = deferred_state->access(k);
state.hash_table = ht_ptr;
assert(ht_ptr);
HashTable* hash_table = reinterpret_cast<HashTable*>(ht_ptr);
// Initialize start to point to the first slot of the hash table.
hash_table->iteratorStart(&(state.it_ptr_start), &(state.it_idx_start));
// Set the end iterator to the same value. This way the first pickMorsel will work.
state.it_ptr_end = state.it_ptr_start;
state.it_idx_end = state.it_idx_start;
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/algebra/suboperators/sources/HashTableSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ struct HashTableSourceState {
static void registerRuntime();

/// Backing hash table from which we want to read.
void* hash_table;
void* hash_table = nullptr;
/// Iterator pointer at which to start.
char* it_ptr_start;
char* it_ptr_start = nullptr;
/// Current iterator index.
uint64_t it_idx_start;
/// Iterator pointer at which to end.
char* it_ptr_end;
char* it_ptr_end = nullptr;
/// Iterator end index.
uint64_t it_idx_end;
};
Expand Down
2 changes: 1 addition & 1 deletion test/tpch/test_queries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ INSTANTIATE_TEST_CASE_P(
tpch_queries,
TPCHQueriesTestT,
::testing::Combine(
::testing::Values("q1", "q3", "q4", "q5", "q6", "q13", "q14", "q18", "q19", "l_count", "q_bigjoin", "l_point"),
::testing::Values("q1", "q3", "q4", "q5", "q6", "q14", "q18", "q19", "l_count", "q_bigjoin", "l_point"),
::testing::Values(
PipelineExecutor::ExecutionMode::Fused,
PipelineExecutor::ExecutionMode::Interpreted,
Expand Down

0 comments on commit 488c021

Please sign in to comment.