Skip to content

Commit

Permalink
[sssp] revise/streamline conditions for fusion
Browse files Browse the repository at this point in the history
  • Loading branch information
sbeamer committed May 30, 2020
1 parent 9b00ee2 commit 015798b
Showing 1 changed file with 33 additions and 39 deletions.
72 changes: 33 additions & 39 deletions src/sssp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ Returns array of distances for all vertices from given source vertex
This SSSP implementation makes use of the ∆-stepping algorithm [1]. The type
used for weights and distances (WeightT) is typedefined in benchmark.h. The
delta parameter (-d) should be set for each input graph. This implementation
incorporates a new bucket fusion optimization [2] that significantly reduces
the number of iterations needed.
incorporates a new bucket fusion optimization [2] that significantly reduces
the number of iterations (& barriers) needed.
The bins of width delta are actually all thread-local and of type std::vector
so they can grow but are otherwise capacity-proportional. Each iteration is
Expand All @@ -40,15 +40,14 @@ thread-local bin into the shared bin.
Once a vertex is added to a bin, it is not removed, even if its distance is
later updated and it now appears in a lower bin. We find ignoring vertices if
their current distance is less than the min distance for the bin to remove
enough redundant work that this is faster than removing the vertex from older
bins.
their distance is less than the min distance for the current bin removes
enough redundant work to be faster than removing the vertex from older bins.
The bucket fusion optimization [2] executes the next thread-local bin in
the same iteration if the vertices in the next thread-local bin have the
same priority as those in the current shared bin. This optimization greatly
reduces the number of iterations needed without violating the priority-based
execution order, leading to significant speedup on large diameter road networks.
the same iteration if the vertices in the next thread-local bin have the
same priority as those in the current shared bin. This optimization greatly
reduces the number of iterations needed without violating the priority-based
execution order, leading to significant speedup on large diameter road networks.
[1] Ulrich Meyer and Peter Sanders. "δ-stepping: a parallelizable shortest path
algorithm." Journal of Algorithms, 49(1):114–152, 2003.
Expand All @@ -66,30 +65,27 @@ const WeightT kDistInf = numeric_limits<WeightT>::max()/2;
const size_t kMaxBin = numeric_limits<size_t>::max()/2;
const size_t kBinSizeThreshold = 1000;

inline
void RelaxEdges(const WGraph &g, NodeID u, WeightT delta,
int curr_bin_index,
pvector<WeightT> &dist,
vector <vector<NodeID>> &local_bins) {
if (dist[u] >= delta * static_cast<WeightT>(curr_bin_index)) {
for (WNode wn : g.out_neigh(u)) {
WeightT old_dist = dist[wn.v];
WeightT new_dist = dist[u] + wn.w;
if (new_dist < old_dist) {
bool changed_dist = true;
while (!compare_and_swap(dist[wn.v], old_dist, new_dist)) {
old_dist = dist[wn.v];
if (old_dist <= new_dist) {
changed_dist = false;
break;
}
pvector<WeightT> &dist, vector <vector<NodeID>> &local_bins) {
for (WNode wn : g.out_neigh(u)) {
WeightT old_dist = dist[wn.v];
WeightT new_dist = dist[u] + wn.w;
if (new_dist < old_dist) {
bool changed_dist = true;
while (!compare_and_swap(dist[wn.v], old_dist, new_dist)) {
old_dist = dist[wn.v];
if (old_dist <= new_dist) {
changed_dist = false;
break;
}
if (changed_dist) {
size_t dest_bin = new_dist/delta;
if (dest_bin >= local_bins.size()) {
local_bins.resize(dest_bin+1);
}
local_bins[dest_bin].push_back(wn.v);
}
if (changed_dist) {
size_t dest_bin = new_dist/delta;
if (dest_bin >= local_bins.size()) {
local_bins.resize(dest_bin+1);
}
local_bins[dest_bin].push_back(wn.v);
}
}
}
Expand Down Expand Up @@ -117,18 +113,16 @@ pvector<WeightT> DeltaStep(const WGraph &g, NodeID source, WeightT delta) {
#pragma omp for nowait schedule(dynamic, 64)
for (size_t i=0; i < curr_frontier_tail; i++) {
NodeID u = frontier[i];
RelaxEdges(g, u, delta, curr_bin_index, dist, local_bins);
if (dist[u] >= delta * static_cast<WeightT>(curr_bin_index))
RelaxEdges(g, u, delta, dist, local_bins);
}
while (local_bins.size() > 0 && curr_bin_index < local_bins.size() &&
!local_bins[curr_bin_index].empty()) {
size_t curr_bin_size = local_bins[curr_bin_index].size();
while (curr_bin_index < local_bins.size() &&
!local_bins[curr_bin_index].empty() &&
local_bins[curr_bin_index].size() < kBinSizeThreshold) {
vector<NodeID> curr_bin_copy = local_bins[curr_bin_index];
local_bins[curr_bin_index].resize(0);
if (curr_bin_size > kBinSizeThreshold) break;
for (size_t i =0; i < curr_bin_size; i++) {
NodeID u = curr_bin_copy[i];
RelaxEdges(g, u, delta, curr_bin_index, dist, local_bins);
}
for (NodeID u : curr_bin_copy)
RelaxEdges(g, u, delta, dist, local_bins);
}
for (size_t i=curr_bin_index; i < local_bins.size(); i++) {
if (!local_bins[i].empty()) {
Expand Down

0 comments on commit 015798b

Please sign in to comment.