Skip to content

Commit

Permalink
bagpipe: Store a crossbeam GC internally, don't use global singleton
Browse files Browse the repository at this point in the history
- Store a crossgeam GC collector in the BagPipe
- Use this GC instance for pinning, eliminating the dependency on
  the global singleton, and thus on thread-local storage
  • Loading branch information
joshlf committed Mar 7, 2018
1 parent 6f85313 commit f4eaa34
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 169 deletions.
7 changes: 6 additions & 1 deletion bagpipe/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<!-- Copyright 2017 the authors. See the 'Copyright and license' section of the
<!-- Copyright 2017-2018 the authors. See the 'Copyright and license' section of the
README.md file at the top-level directory of this repository.
Licensed under the Apache License, Version 2.0 (the LICENSE-APACHE file) or
Expand All @@ -19,6 +19,11 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
not happen automatically, but there is a new trait to inject cleanup callbacks
to `BagPipe` shutdown.

### Changed
- Bagpipes now store a Crossbeam Epoch GC instance rather than using the global
singleton instance so that using a Bagpipe does not rely on thread-local
storage.

### Fixed
- Fixed a bug where crossbeam TLS would remain uninitialized upon cloning a
`BagPipe`, resulting in stack overflow in `elfmalloc`.
3 changes: 2 additions & 1 deletion bagpipe/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017 the authors. See the 'Copyright and license' section of the
# Copyright 2017-2018 the authors. See the 'Copyright and license' section of the
# README.md file at the top-level directory of this repository.
#
# Licensed under the Apache License, Version 2.0 (the LICENSE-APACHE file) or
Expand Down Expand Up @@ -31,4 +31,5 @@ huge_segments = []

[dependencies]
crossbeam = "0.2"
crossbeam-epoch = "0.4.0"
num_cpus = "1.5"
89 changes: 59 additions & 30 deletions bagpipe/src/bag.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 the authors. See the 'Copyright and license' section of the
// Copyright 2017-2018 the authors. See the 'Copyright and license' section of the
// README.md file at the top-level directory of this repository.
//
// Licensed under the Apache License, Version 2.0 (the LICENSE-APACHE file) or
Expand All @@ -8,7 +8,7 @@
//! Specification of best-effort bags and implementation for `crossbeam`
//! data-structures.
use super::crossbeam::sync::{TreiberStack, SegQueue, MsQueue};
use super::crossbeam::mem::epoch;
use super::crossbeam_epoch::{default_handle, Handle};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

Expand All @@ -22,27 +22,47 @@ pub enum PopStatus {

pub type PopResult<T> = Result<T, PopStatus>;


/// A best-effort Bag data-structure.
///
/// As embodied in the `PopResult` definition, `try_pop` is permitted to
/// fail even if the bag in question is not empty.
pub trait SharedWeakBag {
pub trait SharedWeakBag where Self: Sized {
type Item;

fn new_gc(handle: &Handle) -> Self;

/// Returns a new instance of the data-structure.
fn new() -> Self;
fn new() -> Self {
Self::new_gc(&default_handle())
}

fn try_push_gc(&self, handle: &Handle, it: Self::Item) -> Result<(), Self::Item>;

/// Attempts to push `it` onto the data-structure.
///
/// If successful, `try_push` will return `true`.
fn try_push(&self, it: Self::Item) -> Result<(), Self::Item>;
fn try_push(&self, it: Self::Item) -> Result<(), Self::Item> {
self.try_push_gc(&default_handle(), it)
}

fn try_pop_gc(&self, handle: &Handle) -> PopResult<Self::Item>;

/// Attempts to pop a value from the data-structure.
///
/// There is no guaranteed ordering of popped values. This method
/// may fail arbitrarily even if there are accessible values in the
/// data-structure.
fn try_pop(&self) -> PopResult<Self::Item>;
fn try_pop(&self) -> PopResult<Self::Item> {
self.try_pop_gc(&default_handle())
}

fn push_gc(&self, handle: &Handle, it: Self::Item) {
let _g = handle.pin();
let mut cur_item = it;
while let Err(old_item) = self.try_push_gc(handle, cur_item) {
cur_item = old_item
}
}

/// A push operation that will not fail.
///
Expand All @@ -54,27 +74,27 @@ pub trait SharedWeakBag {
/// `push` also creates a `Guard` for the duration of the function
/// to avoid excessive checking in the hot loop.
fn push(&self, it: Self::Item) {
let _g = epoch::pin();
let mut cur_item = it;
while let Err(old_item) = self.try_push(cur_item) {
cur_item = old_item
}
self.push_gc(&default_handle(), it)
}

/// A pop operation that will not fail.
///
/// Same caveats apply to those of `push`.
fn pop(&self) -> Option<Self::Item> {
let _g = epoch::pin();
fn pop_gc(&self, handle: &Handle) -> Option<Self::Item> {
let _g = handle.pin();
loop {
return match self.try_pop() {
return match self.try_pop_gc(handle) {
Ok(it) => Some(it),
Err(PopStatus::Empty) => None,
Err(PopStatus::TransientFailure) => continue,
};
}
}

/// A pop operation that will not fail.
///
/// Same caveats apply to those of `push`.
fn pop(&self) -> Option<Self::Item> {
self.pop_gc(&default_handle())
}

fn debug(&self) {}
}

Expand All @@ -88,18 +108,18 @@ pub trait SharedWeakBag {
pub trait WeakBag: Clone {
// TODO(ezrosent): should we keep Clone here?
type Item;
// fn new() -> Self;

fn try_push_mut(&mut self, Self::Item) -> Result<(), Self::Item>;
fn try_pop_mut(&mut self) -> PopResult<Self::Item>;
fn push_mut(&mut self, it: Self::Item) {
let _g = epoch::pin();
// TODO(joshlf): Pin the WeakBag's GC for performance
let mut cur_item = it;
while let Err(old_item) = self.try_push_mut(cur_item) {
cur_item = old_item
}
}
fn pop_mut(&mut self) -> Option<Self::Item> {
let _g = epoch::pin();
// TODO(joshlf): Pin the WeakBag's GC for performance
loop {
return match self.try_pop_mut() {
Ok(it) => Some(it),
Expand Down Expand Up @@ -197,14 +217,17 @@ where

impl<T> SharedWeakBag for TreiberStack<T> {
type Item = T;
fn new() -> Self {

fn new_gc(_handle: &Handle) -> Self {
Self::new()
}
fn try_push(&self, t: T) -> Result<(), T> {

fn try_push_gc(&self, _handle: &Handle, t: T) -> Result<(), T> {
self.push(t);
Ok(())
}
fn try_pop(&self) -> PopResult<T> {

fn try_pop_gc(&self, _handle: &Handle) -> PopResult<T> {
match self.pop() {
Some(res) => Ok(res),
None => Err(PopStatus::Empty),
Expand All @@ -214,14 +237,17 @@ impl<T> SharedWeakBag for TreiberStack<T> {

impl<T> SharedWeakBag for SegQueue<T> {
type Item = T;
fn new() -> Self {

fn new_gc(_handle: &Handle) -> Self {
Self::new()
}
fn try_push(&self, t: T) -> Result<(), T> {

fn try_push_gc(&self, _handle: &Handle, t: T) -> Result<(), T> {
self.push(t);
Ok(())
}
fn try_pop(&self) -> PopResult<T> {

fn try_pop_gc(&self, _handle: &Handle) -> PopResult<T> {
match self.try_pop() {
Some(res) => Ok(res),
None => Err(PopStatus::Empty),
Expand All @@ -231,14 +257,17 @@ impl<T> SharedWeakBag for SegQueue<T> {

impl<T> SharedWeakBag for MsQueue<T> {
type Item = T;
fn new() -> Self {

fn new_gc(_handle: &Handle) -> Self {
Self::new()
}
fn try_push(&self, t: T) -> Result<(), T> {

fn try_push_gc(&self, _handle: &Handle, t: T) -> Result<(), T> {
self.push(t);
Ok(())
}
fn try_pop(&self) -> PopResult<T> {

fn try_pop_gc(&self, _handle: &Handle) -> PopResult<T> {
match self.try_pop() {
Some(res) => Ok(res),
None => Err(PopStatus::Empty),
Expand Down
8 changes: 4 additions & 4 deletions bagpipe/src/bin/bench_bag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn enqueue_dequeue_pairs_usize<W>(npairs: usize,
prefill_with: usize,
description: String)
-> WorkloadStats
where W: WeakBag<Item = usize> + Send + Sync + 'static + Default
where W: WeakBag<Item = usize> + Send + 'static + Default
{
let wb = W::default();
for i in 0..prefill_with {
Expand Down Expand Up @@ -113,7 +113,7 @@ fn enqueue_dequeue_pairs_strong<W>(npairs: usize,
prefill_with: usize,
description: String)
-> WorkloadStats
where W: WeakBag<Item = usize> + Send + Sync + 'static + Default
where W: WeakBag<Item = usize> + Send + 'static + Default
{
let wb = W::default();
for i in 0..prefill_with {
Expand Down Expand Up @@ -168,7 +168,7 @@ fn producer_consumer_strong<W>(npairs: usize,
prefill_with: usize,
description: String)
-> WorkloadStats
where W: WeakBag<Item = usize> + Send + Sync + 'static + Default
where W: WeakBag<Item = usize> + Send + 'static + Default
{
let wb = W::default();
for i in 0..prefill_with {
Expand Down Expand Up @@ -238,7 +238,7 @@ fn enqueue_dequeue_usize<W>(npairs: usize,
prefill_with: usize,
description: String)
-> WorkloadStats
where W: WeakBag<Item = usize> + Send + Sync + 'static + Default
where W: WeakBag<Item = usize> + Send + 'static + Default
{
let wb = W::default();
for i in 0..prefill_with {
Expand Down
29 changes: 23 additions & 6 deletions bagpipe/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 the authors. See the 'Copyright and license' section of the
// Copyright 2017-2018 the authors. See the 'Copyright and license' section of the
// README.md file at the top-level directory of this repository.
//
// Licensed under the Apache License, Version 2.0 (the LICENSE-APACHE file) or
Expand Down Expand Up @@ -65,12 +65,14 @@
//! soon.
extern crate crossbeam;
extern crate crossbeam_epoch;
extern crate num_cpus;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicIsize, Ordering, fence};
use bag::{WeakBag, SharedWeakBag, RevocableWeakBag, Revocable, PopResult, PopStatus};
use crossbeam::mem::CachePadded;
use crossbeam_epoch::Collector;
use std::mem;

pub mod queue;
Expand Down Expand Up @@ -130,6 +132,7 @@ where
Clean: BagCleanup<Item = B::Item>,
{
pipes: Arc<BagPipeState<B, Clean>>,
gc: crossbeam_epoch::Handle,
offset: usize,
stride: usize,
push_failures: usize,
Expand All @@ -153,7 +156,8 @@ impl<B: SharedWeakBag, Clean: BagCleanup<Item = B::Item>> Clone for BagPipe<B, C
// handle for elfmalloc is initialized, reentrancy guards are no longer checked; this
// forces later calls to elfmalloc that require EBMR to initialize TLS, and call malloc:
// resulting in a recursive malloc call that blows out the stack.
let _ = crossbeam::mem::epoch::pin();
let _ = self.gc.pin();

#[cfg(feature="prime_schedules")]
let offset = {
primes::get(self.pipes.all_refs.fetch_add(1, Ordering::Relaxed) + 1)
Expand All @@ -165,6 +169,7 @@ impl<B: SharedWeakBag, Clean: BagCleanup<Item = B::Item>> Clone for BagPipe<B, C
};
BagPipe {
pipes: self.pipes.clone(),
gc: self.pipes.gc.handle(),
offset: offset & (self.pipes.pipes.len() - 1),
stride: offset,
push_failures: 0,
Expand All @@ -181,9 +186,14 @@ impl<B: SharedWeakBag, Clean: BagCleanup<Item = B::Item>> BagPipe<B, Clean> {
#[cfg(not(feature="prime_schedules"))]
let offset = 1;
debug_assert!(size > 0);

let pipes = Arc::new(BagPipeState::new_size(size, clean));
let gc = pipes.gc.handle();

BagPipe {
pipes: Arc::new(BagPipeState::new_size(size, clean)),
offset: offset,
pipes,
gc,
offset,
stride: offset,
push_failures: 0,
pop_failures: 0,
Expand Down Expand Up @@ -252,9 +262,14 @@ impl<B: SharedWeakBag, Clean: BagCleanup<Item = B::Item> + Default> BagPipe<B, C
let offset = primes::get(1);
#[cfg(not(feature="prime_schedules"))]
let offset = 1;

let pipes = Arc::new(BagPipeState::new(Clean::default()));
let gc = pipes.gc.handle();

BagPipe {
pipes: Arc::new(BagPipeState::new(Clean::default())),
offset: offset,
pipes,
gc,
offset,
stride: offset,
push_failures: 0,
pop_failures: 0,
Expand Down Expand Up @@ -382,6 +397,7 @@ struct BagPipeState<B: SharedWeakBag, Clean: BagCleanup<Item = B::Item>> {
counters: [CachePadded<AtomicIsize>; N_COUNTERS],
pipes: Vec<B>,
clean: Clean,
gc: Collector,
}

impl<B: SharedWeakBag, Clean: BagCleanup<Item = B::Item>> Drop for BagPipeState<B, Clean> {
Expand All @@ -400,6 +416,7 @@ impl<B: SharedWeakBag, Clean: BagCleanup<Item = B::Item>> BagPipeState<B, Clean>
counters: unsafe { mem::transmute([[0 as usize; 32]; N_COUNTERS]) },
pipes: Vec::with_capacity(len),
clean: clean,
gc: Collector::new(),
};
for _ in 0..len {
res.pipes.push(B::new())
Expand Down
Loading

0 comments on commit f4eaa34

Please sign in to comment.