Skip to content

Commit b387e33

Browse files
committed
bagpipe: Store a crossbeam GC internally, don't use global singleton
- Store a crossgeam GC collector in the BagPipe - Use this GC instance for pinning, eliminating the dependency on the global singleton, and thus on thread-local storage - elfmalloc: Don't set bsalloc as the global allocator because the new GC allocates on clone, and if the global allocator is bsalloc, each clone call becomes very expensive, which makes tests run very slowly - elfmalloc: Use opt-level=3 for tests in Travis - elfc: Set bsalloc as the global allocator
1 parent b609f26 commit b387e33

File tree

11 files changed

+270
-209
lines changed

11 files changed

+270
-209
lines changed

bagpipe/CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<!-- Copyright 2017 the authors. See the 'Copyright and license' section of the
1+
<!-- Copyright 2017-2018 the authors. See the 'Copyright and license' section of the
22
README.md file at the top-level directory of this repository.
33
44
Licensed under the Apache License, Version 2.0 (the LICENSE-APACHE file) or
@@ -19,6 +19,11 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
1919
not happen automatically, but there is a new trait to inject cleanup callbacks
2020
to `BagPipe` shutdown.
2121

22+
### Changed
23+
- Bagpipes now store a Crossbeam Epoch GC instance rather than using the global
24+
singleton instance so that using a Bagpipe does not rely on thread-local
25+
storage.
26+
2227
### Fixed
2328
- Fixed a bug where crossbeam TLS would remain uninitialized upon cloning a
2429
`BagPipe`, resulting in stack overflow in `elfmalloc`.

bagpipe/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2017 the authors. See the 'Copyright and license' section of the
1+
# Copyright 2017-2018 the authors. See the 'Copyright and license' section of the
22
# README.md file at the top-level directory of this repository.
33
#
44
# Licensed under the Apache License, Version 2.0 (the LICENSE-APACHE file) or
@@ -31,4 +31,5 @@ huge_segments = []
3131

3232
[dependencies]
3333
crossbeam = "0.2"
34+
crossbeam-epoch = "0.4.0"
3435
num_cpus = "1.5"

bagpipe/src/bag.rs

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2017 the authors. See the 'Copyright and license' section of the
1+
// Copyright 2017-2018 the authors. See the 'Copyright and license' section of the
22
// README.md file at the top-level directory of this repository.
33
//
44
// Licensed under the Apache License, Version 2.0 (the LICENSE-APACHE file) or
@@ -8,7 +8,7 @@
88
//! Specification of best-effort bags and implementation for `crossbeam`
99
//! data-structures.
1010
use super::crossbeam::sync::{TreiberStack, SegQueue, MsQueue};
11-
use super::crossbeam::mem::epoch;
11+
use super::crossbeam_epoch::{Collector, Guard, Handle};
1212
use std::sync::atomic::AtomicUsize;
1313
use std::sync::Arc;
1414

@@ -22,52 +22,47 @@ pub enum PopStatus {
2222

2323
pub type PopResult<T> = Result<T, PopStatus>;
2424

25-
2625
/// A best-effort Bag data-structure.
2726
///
2827
/// As embodied in the `PopResult` definition, `try_pop` is permitted to
2928
/// fail even if the bag in question is not empty.
30-
pub trait SharedWeakBag {
29+
pub trait SharedWeakBag where Self: Sized {
3130
type Item;
31+
3232
/// Returns a new instance of the data-structure.
3333
fn new() -> Self;
3434

3535
/// Attempts to push `it` onto the data-structure.
3636
///
3737
/// If successful, `try_push` will return `true`.
38-
fn try_push(&self, it: Self::Item) -> Result<(), Self::Item>;
38+
fn try_push(&self, guard: &Guard, it: Self::Item) -> Result<(), Self::Item>;
3939

4040
/// Attempts to pop a value from the data-structure.
4141
///
4242
/// There is no guaranteed ordering of popped values. This method
4343
/// may fail arbitrarily even if there are accessible values in the
4444
/// data-structure.
45-
fn try_pop(&self) -> PopResult<Self::Item>;
45+
fn try_pop(&self, guard: &Guard) -> PopResult<Self::Item>;
4646

4747
/// A push operation that will not fail.
4848
///
4949
/// The default implementation of `push` simply calls `try_push`
5050
/// in a loop. until it succeeds. Depending on the underlying
5151
/// data-structure this may loop infinitely under some
5252
/// circumstances.
53-
///
54-
/// `push` also creates a `Guard` for the duration of the function
55-
/// to avoid excessive checking in the hot loop.
56-
fn push(&self, it: Self::Item) {
57-
let _g = epoch::pin();
53+
fn push(&self, guard: &Guard, it: Self::Item) {
5854
let mut cur_item = it;
59-
while let Err(old_item) = self.try_push(cur_item) {
55+
while let Err(old_item) = self.try_push(guard, cur_item) {
6056
cur_item = old_item
6157
}
6258
}
6359

6460
/// A pop operation that will not fail.
6561
///
6662
/// Same caveats apply to those of `push`.
67-
fn pop(&self) -> Option<Self::Item> {
68-
let _g = epoch::pin();
63+
fn pop(&self, guard: &Guard) -> Option<Self::Item> {
6964
loop {
70-
return match self.try_pop() {
65+
return match self.try_pop(guard) {
7166
Ok(it) => Some(it),
7267
Err(PopStatus::Empty) => None,
7368
Err(PopStatus::TransientFailure) => continue,
@@ -88,24 +83,24 @@ pub trait SharedWeakBag {
8883
pub trait WeakBag: Clone {
8984
// TODO(ezrosent): should we keep Clone here?
9085
type Item;
91-
// fn new() -> Self;
86+
9287
fn try_push_mut(&mut self, Self::Item) -> Result<(), Self::Item>;
9388
fn try_pop_mut(&mut self) -> PopResult<Self::Item>;
9489
fn push_mut(&mut self, it: Self::Item) {
95-
let _g = epoch::pin();
90+
// TODO(joshlf): Pin the WeakBag's GC for performance
9691
let mut cur_item = it;
9792
while let Err(old_item) = self.try_push_mut(cur_item) {
9893
cur_item = old_item
9994
}
10095
}
10196
fn pop_mut(&mut self) -> Option<Self::Item> {
102-
let _g = epoch::pin();
97+
// TODO(joshlf): Pin the WeakBag's GC for performance
10398
loop {
104-
return match self.try_pop_mut() {
105-
Ok(it) => Some(it),
106-
Err(PopStatus::Empty) => None,
107-
Err(PopStatus::TransientFailure) => continue,
108-
};
99+
match self.try_pop_mut() {
100+
Ok(it) => break Some(it),
101+
Err(PopStatus::Empty) => break None,
102+
Err(PopStatus::TransientFailure) => {}
103+
}
109104
}
110105
}
111106

@@ -120,25 +115,46 @@ pub trait WeakBag: Clone {
120115
}
121116
}
122117

123-
pub struct ArcLike<B>(Arc<B>);
118+
pub struct ArcLike<B>{
119+
arc: Arc<B>,
120+
gc: Handle,
121+
}
124122

125123
impl<B> Clone for ArcLike<B> {
126124
fn clone(&self) -> Self {
127-
ArcLike(self.0.clone())
125+
ArcLike {
126+
arc: self.arc.clone(),
127+
gc: self.gc.clone(),
128+
}
128129
}
129130
}
130131

131132
impl<B: SharedWeakBag> Default for ArcLike<B> {
132-
fn default() -> Self { ArcLike(Arc::new(B::new())) }
133+
fn default() -> Self {
134+
ArcLike {
135+
arc: Arc::new(B::new()),
136+
gc: Collector::new().handle(),
137+
}
138+
}
133139
}
134140

135141
impl<B: SharedWeakBag> WeakBag for ArcLike<B> {
136142
type Item = B::Item;
143+
137144
fn try_push_mut(&mut self, it: Self::Item) -> Result<(), Self::Item> {
138-
self.0.try_push(it)
145+
self.arc.try_push(&self.gc.pin(), it)
139146
}
147+
140148
fn try_pop_mut(&mut self) -> PopResult<Self::Item> {
141-
self.0.try_pop()
149+
self.arc.try_pop(&self.gc.pin())
150+
}
151+
152+
fn push_mut(&mut self, it: Self::Item) {
153+
self.arc.push(&self.gc.pin(), it)
154+
}
155+
156+
fn pop_mut(&mut self) -> Option<Self::Item> {
157+
self.arc.pop(&self.gc.pin())
142158
}
143159
}
144160

@@ -197,14 +213,17 @@ where
197213

198214
impl<T> SharedWeakBag for TreiberStack<T> {
199215
type Item = T;
216+
200217
fn new() -> Self {
201218
Self::new()
202219
}
203-
fn try_push(&self, t: T) -> Result<(), T> {
220+
221+
fn try_push(&self, _guard: &Guard, t: T) -> Result<(), T> {
204222
self.push(t);
205223
Ok(())
206224
}
207-
fn try_pop(&self) -> PopResult<T> {
225+
226+
fn try_pop(&self, _guard: &Guard) -> PopResult<T> {
208227
match self.pop() {
209228
Some(res) => Ok(res),
210229
None => Err(PopStatus::Empty),
@@ -214,14 +233,17 @@ impl<T> SharedWeakBag for TreiberStack<T> {
214233

215234
impl<T> SharedWeakBag for SegQueue<T> {
216235
type Item = T;
236+
217237
fn new() -> Self {
218238
Self::new()
219239
}
220-
fn try_push(&self, t: T) -> Result<(), T> {
240+
241+
fn try_push(&self, _guard: &Guard, t: T) -> Result<(), T> {
221242
self.push(t);
222243
Ok(())
223244
}
224-
fn try_pop(&self) -> PopResult<T> {
245+
246+
fn try_pop(&self, _guard: &Guard) -> PopResult<T> {
225247
match self.try_pop() {
226248
Some(res) => Ok(res),
227249
None => Err(PopStatus::Empty),
@@ -231,14 +253,17 @@ impl<T> SharedWeakBag for SegQueue<T> {
231253

232254
impl<T> SharedWeakBag for MsQueue<T> {
233255
type Item = T;
256+
234257
fn new() -> Self {
235258
Self::new()
236259
}
237-
fn try_push(&self, t: T) -> Result<(), T> {
260+
261+
fn try_push(&self, _guard: &Guard, t: T) -> Result<(), T> {
238262
self.push(t);
239263
Ok(())
240264
}
241-
fn try_pop(&self) -> PopResult<T> {
265+
266+
fn try_pop(&self, _guard: &Guard) -> PopResult<T> {
242267
match self.try_pop() {
243268
Some(res) => Ok(res),
244269
None => Err(PopStatus::Empty),

bagpipe/src/bin/bench_bag.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2017 the authors. See the 'Copyright and license' section of the
1+
// Copyright 2017-2018 the authors. See the 'Copyright and license' section of the
22
// README.md file at the top-level directory of this repository.
33
//
44
// Licensed under the Apache License, Version 2.0 (the LICENSE-APACHE file) or
@@ -52,7 +52,7 @@ fn enqueue_dequeue_pairs_usize<W>(npairs: usize,
5252
prefill_with: usize,
5353
description: String)
5454
-> WorkloadStats
55-
where W: WeakBag<Item = usize> + Send + Sync + 'static + Default
55+
where W: WeakBag<Item = usize> + Send + 'static + Default
5656
{
5757
let wb = W::default();
5858
for i in 0..prefill_with {
@@ -113,7 +113,7 @@ fn enqueue_dequeue_pairs_strong<W>(npairs: usize,
113113
prefill_with: usize,
114114
description: String)
115115
-> WorkloadStats
116-
where W: WeakBag<Item = usize> + Send + Sync + 'static + Default
116+
where W: WeakBag<Item = usize> + Send + 'static + Default
117117
{
118118
let wb = W::default();
119119
for i in 0..prefill_with {
@@ -168,7 +168,7 @@ fn producer_consumer_strong<W>(npairs: usize,
168168
prefill_with: usize,
169169
description: String)
170170
-> WorkloadStats
171-
where W: WeakBag<Item = usize> + Send + Sync + 'static + Default
171+
where W: WeakBag<Item = usize> + Send + 'static + Default
172172
{
173173
let wb = W::default();
174174
for i in 0..prefill_with {
@@ -238,7 +238,7 @@ fn enqueue_dequeue_usize<W>(npairs: usize,
238238
prefill_with: usize,
239239
description: String)
240240
-> WorkloadStats
241-
where W: WeakBag<Item = usize> + Send + Sync + 'static + Default
241+
where W: WeakBag<Item = usize> + Send + 'static + Default
242242
{
243243
let wb = W::default();
244244
for i in 0..prefill_with {

0 commit comments

Comments
 (0)