Skip to content

Commit 139e29f

Browse files
author
Tae Hyoun Park
committed
Improve multithreading partitioning
1 parent 31e22a9 commit 139e29f

File tree

3 files changed

+70
-113
lines changed

3 files changed

+70
-113
lines changed

include/queryosity/dataset_partition.h

Lines changed: 8 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@ namespace queryosity {
1616

1717
namespace dataset {
1818

19+
using entry_t = unsigned long long;
20+
1921
using partition_t = std::vector<part_t>;
2022

23+
using slot_t = unsigned int;
24+
2125
namespace partition {
2226

2327
partition_t align(std::vector<partition_t> const &partitions);
2428

2529
partition_t truncate(partition_t const &parts, long long nentries_max);
2630

27-
partition_t merge(partition_t const &parts, unsigned int nslots_max);
28-
2931
} // namespace partition
3032

3133
} // namespace dataset
@@ -34,12 +36,12 @@ partition_t merge(partition_t const &parts, unsigned int nslots_max);
3436

3537
inline queryosity::dataset::partition_t queryosity::dataset::partition::align(
3638
std::vector<partition_t> const &partitions) {
37-
std::map<unsigned long long, unsigned int> edge_counts;
39+
std::map<entry_t, unsigned int> edge_counts;
3840
const unsigned int num_vectors = partitions.size();
3941

4042
// Count appearances of each edge
4143
for (const auto &vec : partitions) {
42-
std::map<unsigned long long, bool>
44+
std::map<entry_t, bool>
4345
seen_edges; // Ensure each edge is only counted once per vector
4446
for (const auto &p : vec) {
4547
if (seen_edges.find(p.first) == seen_edges.end()) {
@@ -54,74 +56,22 @@ inline queryosity::dataset::partition_t queryosity::dataset::partition::align(
5456
}
5557

5658
// Filter edges that appear in all vectors
57-
std::vector<unsigned long long> aligned_edges;
59+
std::vector<entry_t> aligned_edges;
5860
for (const auto &pair : edge_counts) {
5961
if (pair.second == num_vectors) {
6062
aligned_edges.push_back(pair.first);
6163
}
6264
}
6365

6466
// Create aligned vector of pairs
65-
std::vector<std::pair<unsigned long long, unsigned long long>> aligned_ranges;
67+
std::vector<std::pair<entry_t, entry_t>> aligned_ranges;
6668
for (size_t i = 0; i < aligned_edges.size() - 1; ++i) {
6769
aligned_ranges.emplace_back(aligned_edges[i], aligned_edges[i + 1]);
6870
}
6971

7072
return aligned_ranges;
7173
}
7274

73-
inline queryosity::dataset::partition_t queryosity::dataset::partition::merge(
74-
queryosity::dataset::partition_t const &parts, unsigned int nslots_max) {
75-
76-
// no merging needed
77-
if (nslots_max >= static_cast<unsigned int>(parts.size()))
78-
return parts;
79-
80-
assert(!parts.empty() && nslots_max > 0);
81-
82-
partition_t parts_merged;
83-
84-
const unsigned int total_size = parts.back().second - parts.front().first;
85-
const unsigned int size_per_slot = total_size / nslots_max;
86-
const unsigned int extra_size = total_size % nslots_max;
87-
88-
unsigned int current_start = parts[0].first;
89-
unsigned int current_end = current_start;
90-
unsigned int accumulated_size = 0;
91-
unsigned int nslots_created = 0;
92-
93-
for (const auto &part : parts) {
94-
unsigned int part_size = part.second - part.first;
95-
// check if another part can be added
96-
if (accumulated_size + part_size >
97-
size_per_slot + (nslots_created < extra_size ? 1 : 0) &&
98-
nslots_created < nslots_max - 1) {
99-
// add the current range if adding next part will exceed the average size
100-
parts_merged.emplace_back(current_start, current_end);
101-
current_start = current_end;
102-
accumulated_size = 0;
103-
++nslots_created;
104-
}
105-
106-
// add part size to the current slot
107-
accumulated_size += part_size;
108-
current_end += part_size;
109-
110-
// handle the last slot differently to include all remaining parts
111-
if (nslots_created == nslots_max - 1) {
112-
parts_merged.emplace_back(current_start, parts.back().second);
113-
break; // All parts have been processed
114-
}
115-
}
116-
117-
// ensure we have exactly nslots_max slots
118-
if (static_cast<unsigned int>(parts_merged.size()) < nslots_max) {
119-
parts_merged.emplace_back(current_start, parts.back().second);
120-
}
121-
122-
return parts_merged;
123-
}
124-
12575
inline queryosity::dataset::partition_t
12676
queryosity::dataset::partition::truncate(
12777
queryosity::dataset::partition_t const &parts, long long nentries_max) {

include/queryosity/dataset_player.h

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ class player : public query::experiment {
1515

1616
public:
1717
void play(std::vector<std::unique_ptr<source>> const &sources, double scale,
18-
unsigned int slot, unsigned long long begin,
19-
unsigned long long end);
18+
slot_t slot, std::vector<part_t> const& parts);
2019
};
2120

2221
} // namespace dataset
@@ -27,55 +26,56 @@ class player : public query::experiment {
2726

2827
inline void queryosity::dataset::player::play(
2928
std::vector<std::unique_ptr<source>> const &sources, double scale,
30-
unsigned int slot, unsigned long long begin, unsigned long long end) {
29+
slot_t slot, std::vector<part_t> const& parts) {
3130

3231
// apply dataset scale in effect for all queries
3332
for (auto const &qry : m_queries) {
3433
qry->apply_scale(scale);
3534
}
3635

37-
// initialize
38-
for (auto const &ds : sources) {
39-
ds->initialize(slot, begin, end);
40-
}
41-
for (auto const &col : m_columns) {
42-
col->initialize(slot, begin, end);
43-
}
44-
for (auto const &sel : m_selections) {
45-
sel->initialize(slot, begin, end);
46-
}
47-
for (auto const &qry : m_queries) {
48-
qry->initialize(slot, begin, end);
49-
}
50-
51-
// execute
52-
for (auto entry = begin; entry < end; ++entry) {
36+
// traverse each part
37+
for (auto const& part : parts) {
38+
// initialize
5339
for (auto const &ds : sources) {
54-
ds->execute(slot, entry);
40+
ds->initialize(slot, part.first, part.second);
5541
}
5642
for (auto const &col : m_columns) {
57-
col->execute(slot, entry);
43+
col->initialize(slot, part.first, part.second);
5844
}
5945
for (auto const &sel : m_selections) {
60-
sel->execute(slot, entry);
46+
sel->initialize(slot, part.first, part.second);
6147
}
6248
for (auto const &qry : m_queries) {
63-
qry->execute(slot, entry);
49+
qry->initialize(slot, part.first, part.second);
50+
}
51+
// execute
52+
for (auto entry = part.first; entry < part.second; ++entry) {
53+
for (auto const &ds : sources) {
54+
ds->execute(slot, entry);
55+
}
56+
for (auto const &col : m_columns) {
57+
col->execute(slot, entry);
58+
}
59+
for (auto const &sel : m_selections) {
60+
sel->execute(slot, entry);
61+
}
62+
for (auto const &qry : m_queries) {
63+
qry->execute(slot, entry);
64+
}
65+
}
66+
// finalize (in reverse order)
67+
for (auto const &qry : m_queries) {
68+
qry->finalize(slot);
69+
}
70+
for (auto const &sel : m_selections) {
71+
sel->finalize(slot);
72+
}
73+
for (auto const &col : m_columns) {
74+
col->finalize(slot);
75+
}
76+
for (auto const &ds : sources) {
77+
ds->finalize(slot);
6478
}
65-
}
66-
67-
// finalize (in reverse order)
68-
for (auto const &qry : m_queries) {
69-
qry->finalize(slot);
70-
}
71-
for (auto const &sel : m_selections) {
72-
sel->finalize(slot);
73-
}
74-
for (auto const &col : m_columns) {
75-
col->finalize(slot);
76-
}
77-
for (auto const &ds : sources) {
78-
ds->finalize(slot);
7979
}
8080

8181
// clear out queries (should not be re-played)

include/queryosity/dataset_processor.h

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,33 +104,40 @@ inline void queryosity::dataset::processor::process(
104104
}
105105

106106
// 2. partition dataset(s)
107-
std::vector<std::vector<std::pair<unsigned long long, unsigned long long>>>
108-
partitions;
107+
// 2.1 get partition from each dataset source
108+
std::vector<partition_t > partitions_from_sources;
109109
for (auto const &ds : sources) {
110-
auto partition = ds->partition();
111-
if (partition.size())
112-
partitions.push_back(partition);
110+
auto partition_from_source = ds->partition();
111+
if (partition_from_source.size())
112+
partitions_from_sources.push_back(std::move(partition_from_source));
113113
}
114-
if (!partitions.size()) {
115-
throw std::logic_error("no valid dataset partition implemented");
114+
if (!partitions_from_sources.size()) {
115+
throw std::runtime_error("no valid dataset partition implemented");
116116
}
117-
// find common denominator partition
118-
auto partition = dataset::partition::align(partitions);
119-
// truncate entries to row limit
120-
partition = dataset::partition::truncate(partition, nrows);
121-
// merge partition to concurrency limit
122-
partition = dataset::partition::merge(partition, nslots);
123-
// match processor & partition parallelism
124-
this->downsize(partition.size());
117+
// 2.2 find common denominator partition
118+
const auto partition_aligned = dataset::partition::align(partitions_from_sources);
119+
// 2.3 truncate entries to row limit
120+
const auto partition_truncated = dataset::partition::truncate(partition_aligned, nrows);
121+
// 2.3 distribute partition amongst threads
122+
std::vector<partition_t> partitions_for_slots(nslots);
123+
auto nparts_remaining = partition_truncated.size();
124+
const auto nparts = nparts_remaining;
125+
while (nparts_remaining) {
126+
for( unsigned int islot=0; islot<nslots ; ++islot) {
127+
partitions_for_slots[islot].push_back(std::move(partition_truncated[nparts-(nparts_remaining--)]));
128+
if (!nparts_remaining) break;
129+
}
130+
}
131+
// todo: can intel tbb distribute slots during parallel processing?
125132

126133
// 3. run event loop
127134
this->run(
128135
[&sources,
129136
scale](dataset::player *plyr, unsigned int slot,
130-
std::pair<unsigned long long, unsigned long long> part) {
131-
plyr->play(sources, scale, slot, part.first, part.second);
137+
std::vector<std::pair<unsigned long long, unsigned long long>> const& parts) {
138+
plyr->play(sources, scale, slot, parts);
132139
},
133-
m_player_ptrs, m_range_slots, partition);
140+
m_player_ptrs, m_range_slots, partitions_for_slots);
134141

135142
// 4. exit event loop
136143
for (auto const &ds : sources) {

0 commit comments

Comments
 (0)