Skip to content

Commit df4a9ea

Browse files
committed
Move uninit slice, add pop to vec methods
1 parent a07628b commit df4a9ea

File tree

9 files changed

+226
-74
lines changed

9 files changed

+226
-74
lines changed

async/src/tests.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,37 @@ fn push_pop_slice() {
7171
async move {
7272
let mut prod = prod;
7373
let data = (0..COUNT).collect::<Vec<_>>();
74-
prod.push_slice_all(&data).await.unwrap();
74+
prod.push_exact(&data).await.unwrap();
7575
},
7676
async move {
7777
let mut cons = cons;
7878
let mut data = [0; COUNT + 1];
79-
let count = cons.pop_slice_all(&mut data).await.unwrap_err();
79+
let count = cons.pop_exact(&mut data).await.unwrap_err();
8080
assert_eq!(count, COUNT);
8181
assert!(data.into_iter().take(COUNT).eq(0..COUNT));
8282
},
8383
);
8484
}
8585

86+
#[test]
87+
fn push_pop_vec() {
88+
let (prod, cons) = AsyncHeapRb::<usize>::new(3).split();
89+
execute!(
90+
async move {
91+
let mut prod = prod;
92+
let data = (0..COUNT).collect::<Vec<_>>();
93+
prod.push_exact(&data).await.unwrap();
94+
},
95+
async move {
96+
let mut cons = cons;
97+
let mut data = Vec::new();
98+
cons.pop_until_end(&mut data).await;
99+
assert_eq!(data.len(), COUNT);
100+
assert!(data.into_iter().eq(0..COUNT));
101+
},
102+
);
103+
}
104+
86105
#[test]
87106
fn sink_stream() {
88107
use futures::{

async/src/traits/consumer.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub trait AsyncConsumer: Consumer {
4343
/// Future returns:
4444
/// + `Ok` - the whole slice is filled with the items from the buffer.
4545
/// + `Err(count)` - the buffer is empty and the corresponding producer was dropped, number of items copied to slice is returned.
46-
fn pop_slice_all<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self>
46+
fn pop_exact<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self>
4747
where
4848
Self::Item: Copy,
4949
{
@@ -54,6 +54,14 @@ pub trait AsyncConsumer: Consumer {
5454
}
5555
}
5656

57+
#[cfg(feature = "alloc")]
58+
fn pop_until_end<'a: 'b, 'b>(&'a mut self, vec: &'b mut alloc::vec::Vec<Self::Item>) -> PopVecFuture<'a, 'b, Self> {
59+
PopVecFuture {
60+
owner: self,
61+
vec: Some(vec),
62+
}
63+
}
64+
5765
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
5866
where
5967
Self: Unpin,
@@ -177,6 +185,53 @@ where
177185
}
178186
}
179187

188+
#[cfg(feature = "alloc")]
189+
pub struct PopVecFuture<'a, 'b, A: AsyncConsumer + ?Sized> {
190+
owner: &'a mut A,
191+
vec: Option<&'b mut alloc::vec::Vec<A::Item>>,
192+
}
193+
#[cfg(feature = "alloc")]
194+
impl<'a, 'b, A: AsyncConsumer> Unpin for PopVecFuture<'a, 'b, A> {}
195+
#[cfg(feature = "alloc")]
196+
impl<'a, 'b, A: AsyncConsumer> FusedFuture for PopVecFuture<'a, 'b, A> {
197+
fn is_terminated(&self) -> bool {
198+
self.vec.is_none()
199+
}
200+
}
201+
#[cfg(feature = "alloc")]
202+
impl<'a, 'b, A: AsyncConsumer> Future for PopVecFuture<'a, 'b, A> {
203+
type Output = ();
204+
205+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
206+
let mut waker_registered = false;
207+
loop {
208+
let closed = self.owner.is_closed();
209+
let vec = self.vec.take().unwrap();
210+
211+
loop {
212+
if vec.len() == vec.capacity() {
213+
vec.reserve(vec.capacity().max(16));
214+
}
215+
let n = self.owner.pop_slice_uninit(vec.spare_capacity_mut());
216+
if n == 0 {
217+
break;
218+
}
219+
unsafe { vec.set_len(vec.len() + n) };
220+
}
221+
222+
if closed {
223+
break Poll::Ready(());
224+
}
225+
self.vec.replace(vec);
226+
if waker_registered {
227+
break Poll::Pending;
228+
}
229+
self.owner.register_waker(cx.waker());
230+
waker_registered = true;
231+
}
232+
}
233+
}
234+
180235
pub struct WaitOccupiedFuture<'a, A: AsyncConsumer + ?Sized> {
181236
owner: &'a A,
182237
count: usize,

async/src/traits/producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub trait AsyncProducer: Producer {
5959
/// Future returns:
6060
/// + `Ok` - all slice contents are copied.
6161
/// + `Err(count)` - the corresponding consumer was dropped, number of copied items returned.
62-
fn push_slice_all<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self>
62+
fn push_exact<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self>
6363
where
6464
Self::Item: Copy,
6565
{

blocking/src/tests.rs

Lines changed: 93 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{traits::*, wrap::WaitError, BlockingHeapRb};
22
use std::{
33
io::{Read, Write},
4+
sync::Arc,
45
thread,
56
time::Duration,
67
vec,
@@ -17,6 +18,7 @@ This book fully embraces the potential of Rust to empower its users. It's a frie
1718
1819
- Nicholas Matsakis and Aaron Turon
1920
";
21+
const N_REP: usize = 10;
2022

2123
const TIMEOUT: Option<Duration> = Some(Duration::from_millis(1000));
2224

@@ -26,16 +28,19 @@ fn wait() {
2628
let rb = BlockingHeapRb::<u8>::new(7);
2729
let (mut prod, mut cons) = rb.split();
2830

29-
let smsg = THE_BOOK_FOREWORD;
30-
31-
let pjh = thread::spawn(move || {
32-
let mut bytes = smsg;
33-
prod.set_timeout(TIMEOUT);
34-
while !bytes.is_empty() {
35-
assert_eq!(prod.wait_vacant(1), Ok(()));
36-
let n = prod.push_slice(bytes);
37-
assert!(n > 0);
38-
bytes = &bytes[n..bytes.len()]
31+
let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));
32+
33+
let pjh = thread::spawn({
34+
let smsg = smsg.clone();
35+
move || {
36+
let mut bytes = smsg.as_slice();
37+
prod.set_timeout(TIMEOUT);
38+
while !bytes.is_empty() {
39+
assert_eq!(prod.wait_vacant(1), Ok(()));
40+
let n = prod.push_slice(bytes);
41+
assert!(n > 0);
42+
bytes = &bytes[n..bytes.len()]
43+
}
3944
}
4045
});
4146

@@ -59,7 +64,7 @@ fn wait() {
5964
pjh.join().unwrap();
6065
let rmsg = cjh.join().unwrap();
6166

62-
assert_eq!(smsg, rmsg);
67+
assert_eq!(*smsg, rmsg);
6368
}
6469

6570
#[test]
@@ -68,25 +73,65 @@ fn slice_all() {
6873
let rb = BlockingHeapRb::<u8>::new(7);
6974
let (mut prod, mut cons) = rb.split();
7075

71-
let smsg = THE_BOOK_FOREWORD;
76+
let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));
7277

73-
let pjh = thread::spawn(move || {
74-
let bytes = smsg;
75-
prod.set_timeout(TIMEOUT);
76-
assert_eq!(prod.push_all_slice(bytes), bytes.len());
78+
let pjh = thread::spawn({
79+
let smsg = smsg.clone();
80+
move || {
81+
let bytes = smsg;
82+
prod.set_timeout(TIMEOUT);
83+
assert_eq!(prod.push_exact(&bytes), bytes.len());
84+
}
7785
});
7886

79-
let cjh = thread::spawn(move || {
80-
let mut bytes = vec![0u8; smsg.len()];
81-
cons.set_timeout(TIMEOUT);
82-
assert_eq!(cons.pop_all_slice(&mut bytes), bytes.len());
83-
bytes
87+
let cjh = thread::spawn({
88+
let smsg = smsg.clone();
89+
move || {
90+
let mut bytes = vec![0u8; smsg.len()];
91+
cons.set_timeout(TIMEOUT);
92+
assert_eq!(cons.pop_exact(&mut bytes), bytes.len());
93+
bytes
94+
}
8495
});
8596

8697
pjh.join().unwrap();
8798
let rmsg = cjh.join().unwrap();
8899

89-
assert_eq!(smsg, rmsg);
100+
assert_eq!(*smsg, rmsg);
101+
}
102+
103+
#[test]
104+
#[cfg_attr(miri, ignore)]
105+
fn vec_all() {
106+
let rb = BlockingHeapRb::<u8>::new(7);
107+
let (mut prod, mut cons) = rb.split();
108+
109+
let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));
110+
111+
let pjh = thread::spawn({
112+
let smsg = smsg.clone();
113+
move || {
114+
let bytes = smsg;
115+
prod.set_timeout(TIMEOUT);
116+
assert_eq!(prod.push_exact(&bytes), bytes.len());
117+
}
118+
});
119+
120+
let cjh = thread::spawn({
121+
let smsg = smsg.clone();
122+
move || {
123+
let mut bytes = Vec::new();
124+
cons.set_timeout(TIMEOUT);
125+
cons.pop_until_end(&mut bytes);
126+
assert_eq!(bytes.len(), smsg.len());
127+
bytes
128+
}
129+
});
130+
131+
pjh.join().unwrap();
132+
let rmsg = cjh.join().unwrap();
133+
134+
assert_eq!(*smsg, rmsg);
90135
}
91136

92137
#[test]
@@ -95,12 +140,15 @@ fn iter_all() {
95140
let rb = BlockingHeapRb::<u8>::new(7);
96141
let (mut prod, mut cons) = rb.split();
97142

98-
let smsg = THE_BOOK_FOREWORD;
143+
let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));
99144

100-
let pjh = thread::spawn(move || {
101-
prod.set_timeout(TIMEOUT);
102-
let bytes = smsg;
103-
assert_eq!(prod.push_all_iter(bytes.iter().copied()), bytes.len());
145+
let pjh = thread::spawn({
146+
let smsg = smsg.clone();
147+
move || {
148+
prod.set_timeout(TIMEOUT);
149+
let bytes = smsg;
150+
assert_eq!(prod.push_all_iter(bytes.iter().copied()), bytes.len());
151+
}
104152
});
105153

106154
let cjh = thread::spawn(move || {
@@ -111,7 +159,7 @@ fn iter_all() {
111159
pjh.join().unwrap();
112160
let rmsg = cjh.join().unwrap();
113161

114-
assert_eq!(smsg, rmsg);
162+
assert_eq!(*smsg, rmsg);
115163
}
116164

117165
#[test]
@@ -120,23 +168,29 @@ fn write_read() {
120168
let rb = BlockingHeapRb::<u8>::new(7);
121169
let (mut prod, mut cons) = rb.split();
122170

123-
let smsg = THE_BOOK_FOREWORD;
171+
let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));
124172

125-
let pjh = thread::spawn(move || {
126-
prod.set_timeout(TIMEOUT);
127-
let bytes = smsg;
128-
prod.write_all(bytes).unwrap();
173+
let pjh = thread::spawn({
174+
let smsg = smsg.clone();
175+
move || {
176+
prod.set_timeout(TIMEOUT);
177+
let bytes = smsg;
178+
prod.write_all(&bytes).unwrap();
179+
}
129180
});
130181

131-
let cjh = thread::spawn(move || {
132-
cons.set_timeout(TIMEOUT);
133-
let mut bytes = Vec::new();
134-
assert_eq!(cons.read_to_end(&mut bytes).unwrap(), smsg.len());
135-
bytes
182+
let cjh = thread::spawn({
183+
let smsg = smsg.clone();
184+
move || {
185+
cons.set_timeout(TIMEOUT);
186+
let mut bytes = Vec::new();
187+
assert_eq!(cons.read_to_end(&mut bytes).unwrap(), smsg.len());
188+
bytes
189+
}
136190
});
137191

138192
pjh.join().unwrap();
139193
let rmsg = cjh.join().unwrap();
140194

141-
assert_eq!(smsg, rmsg);
195+
assert_eq!(*smsg, rmsg);
142196
}

blocking/src/wrap/cons.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl<R: BlockingRbRef> BlockingCons<R>
6767
where
6868
<Self as Observer>::Item: Copy,
6969
{
70-
pub fn pop_all_slice(&mut self, mut slice: &mut [<Self as Observer>::Item]) -> usize {
70+
pub fn pop_exact(&mut self, mut slice: &mut [<Self as Observer>::Item]) -> usize {
7171
if slice.is_empty() {
7272
return 0;
7373
}
@@ -83,6 +83,28 @@ where
8383
}
8484
count
8585
}
86+
87+
#[cfg(feature = "alloc")]
88+
pub fn pop_until_end(&mut self, vec: &mut alloc::vec::Vec<<Self as Observer>::Item>) {
89+
if self.is_closed() && self.is_empty() {
90+
return;
91+
}
92+
for _ in wait_iter!(self) {
93+
loop {
94+
if vec.len() == vec.capacity() {
95+
vec.reserve(vec.capacity().max(16));
96+
}
97+
let n = self.base.pop_slice_uninit(vec.spare_capacity_mut());
98+
if n == 0 {
99+
break;
100+
}
101+
unsafe { vec.set_len(vec.len() + n) };
102+
}
103+
if self.is_closed() && self.is_empty() {
104+
break;
105+
}
106+
}
107+
}
86108
}
87109

88110
#[cfg(feature = "std")]

blocking/src/wrap/prod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl<R: BlockingRbRef> BlockingProd<R>
8383
where
8484
<Self as Observer>::Item: Copy,
8585
{
86-
pub fn push_all_slice(&mut self, mut slice: &[<Self as Observer>::Item]) -> usize {
86+
pub fn push_exact(&mut self, mut slice: &[<Self as Observer>::Item]) -> usize {
8787
if slice.is_empty() {
8888
return 0;
8989
}

src/rb/macros.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,6 @@ macro_rules! rb_impl_init {
4646
unsafe { Self::from_raw_parts(crate::utils::boxed_slice_to_uninit(value).into(), read, write) }
4747
}
4848
}
49-
50-
#[cfg(feature = "alloc")]
51-
impl<T> From<alloc::vec::Vec<T>> for $type<crate::storage::Heap<T>> {
52-
fn from(mut vec: alloc::vec::Vec<T>) -> Self {
53-
let ptr = vec.as_mut_ptr() as *mut core::mem::MaybeUninit<T>;
54-
let len = vec.len();
55-
let capacity = vec.capacity();
56-
core::mem::forget(vec);
57-
let data = unsafe { Box::from_raw(core::ptr::slice_from_raw_parts_mut(ptr, capacity)) };
58-
assert_eq!(data.len(), capacity);
59-
assert_eq!(ptr as *const _, data.as_ptr());
60-
unsafe { Self::from_raw_parts(data.into(), 0, len) }
61-
}
62-
}
6349
};
6450
}
6551

0 commit comments

Comments
 (0)