Skip to content

Commit

Permalink
Add pending frame flag to Stream::next
Browse files Browse the repository at this point in the history
  • Loading branch information
dcz-self committed Oct 31, 2024
1 parent 6ef8ead commit af9f952
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 7 deletions.
20 changes: 18 additions & 2 deletions src/io/dmabuf/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ impl<'a> Stream where Self: CaptureStream<'a> {
let meta = self.buf_meta[index];
Ok((buf, meta))
}

/// Waits for a buffer to be ready
pub fn wait(&self) -> Result<(), io::Error>{
if self.handle.poll(libc::POLLIN, -1)? == 0 {
// This condition can only happen if there was a timeout.
// A timeout is only possible if the `timeout` value is non-zero, meaning we should
// propagate it to the caller.
return Err(io::Error::new(io::ErrorKind::TimedOut, "Blocking poll"));
}
Ok(())
}

/// Waits for a buffer to be ready
pub fn is_ready(&self) -> Result<bool, io::Error>{
Ok(self.handle.poll(libc::POLLIN, 0)? > 0)
}
}

impl Drop for Stream {
Expand Down Expand Up @@ -236,7 +252,7 @@ impl<'a> CaptureStream<'a> for Stream {
Ok(self.arena_index)
}

fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata)> {
fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata, bool)> {
if !self.active {
// Enqueue all buffers once on stream start
for index in 0..self.arena.bufs.len() {
Expand All @@ -254,6 +270,6 @@ impl<'a> CaptureStream<'a> for Stream {
// will always be valid.
let bytes = &mut self.arena.bufs[self.arena_index].as_ref().unwrap();
let meta = &self.buf_meta[self.arena_index];
Ok((bytes, meta))
Ok((bytes, meta, self.is_ready()?))
}
}
4 changes: 2 additions & 2 deletions src/io/mmap/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl<'a, 'b> CaptureStream<'b> for Stream<'a> {
Ok(self.arena_index)
}

fn next(&'b mut self) -> io::Result<(&Self::Item, &Metadata)> {
fn next(&'b mut self) -> io::Result<(&Self::Item, &Metadata, bool)> {
if !self.active {
// Enqueue all buffers once on stream start
for index in 0..self.arena.bufs.len() {
Expand All @@ -205,7 +205,7 @@ impl<'a, 'b> CaptureStream<'b> for Stream<'a> {
// will always be valid.
let bytes = &self.arena.bufs[self.arena_index];
let meta = &self.buf_meta[self.arena_index];
Ok((bytes, meta))
Ok((bytes, meta, false))
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/io/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ pub trait CaptureStream<'a>: Stream {

/// Fetch a new frame by first queueing and then dequeueing.
/// First time initialization is performed if necessary.
fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata)>;
/// The last item in the tuple is True if another frame is pending.
fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata, bool)>;
}

pub trait OutputStream<'a>: Stream {
Expand Down
4 changes: 2 additions & 2 deletions src/io/userptr/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<'a> CaptureStream<'a> for Stream {
Ok(self.arena_index)
}

fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata)> {
fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata, bool)> {
if !self.active {
// Enqueue all buffers once on stream start
for index in 0..self.arena.bufs.len() {
Expand All @@ -209,6 +209,6 @@ impl<'a> CaptureStream<'a> for Stream {
// will always be valid.
let bytes = &mut self.arena.bufs[self.arena_index];
let meta = &self.buf_meta[self.arena_index];
Ok((bytes, meta))
Ok((bytes, meta, false))
}
}

0 comments on commit af9f952

Please sign in to comment.