Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(virtq): remove intermediate virtqueue channels #1390

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ virtio = { package = "virtio-spec", version = "0.1", features = ["alloc", "mmio"
ahash = { version = "0.8", default-features = false }
align-address = "0.3"
anstyle = { version = "1", default-features = false }
async-channel = { version = "2.3", default-features = false }
async-lock = { version = "3.4.0", default-features = false }
async-trait = "0.1.83"
bit_field = "0.10"
Expand Down
56 changes: 9 additions & 47 deletions src/drivers/net/virtio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,11 @@ impl CtrlQueue {

pub struct RxQueues {
vqs: Vec<Box<dyn Virtq>>,
poll_sender: async_channel::Sender<UsedBufferToken>,
poll_receiver: async_channel::Receiver<UsedBufferToken>,
packet_size: u32,
}

impl RxQueues {
pub fn new(vqs: Vec<Box<dyn Virtq>>, dev_cfg: &NetDevCfg) -> Self {
let (poll_sender, poll_receiver) = async_channel::unbounded();

// See Virtio specification v1.1 - 5.1.6.3.1
//
let packet_size = if dev_cfg.features.contains(virtio::net::F::MRG_RXBUF) {
Expand All @@ -73,12 +69,7 @@ impl RxQueues {
dev_cfg.raw.as_ptr().mtu().read().to_ne().into()
};

Self {
vqs,
poll_sender,
poll_receiver,
packet_size,
}
Self { vqs, packet_size }
}

/// Takes care of handling packets correctly which need some processing after being received.
Expand All @@ -95,32 +86,12 @@ impl RxQueues {
fn add(&mut self, mut vq: Box<dyn Virtq>) {
const BUFF_PER_PACKET: u16 = 2;
let num_packets: u16 = u16::from(vq.size()) / BUFF_PER_PACKET;
fill_queue(
vq.as_mut(),
num_packets,
self.packet_size,
self.poll_sender.clone(),
);
fill_queue(vq.as_mut(), num_packets, self.packet_size);
self.vqs.push(vq);
}

fn get_next(&mut self) -> Option<UsedBufferToken> {
let transfer = self.poll_receiver.try_recv();

transfer
.or_else(|_| {
// Check if any not yet provided transfers are in the queue.
self.poll();

self.poll_receiver.try_recv()
})
.ok()
}

fn poll(&mut self) {
for vq in &mut self.vqs {
vq.poll();
}
self.vqs[0].try_recv().ok()
}

fn enable_notifs(&mut self) {
Expand All @@ -140,12 +111,7 @@ impl RxQueues {
}
}

fn fill_queue(
vq: &mut dyn Virtq,
num_packets: u16,
packet_size: u32,
poll_sender: async_channel::Sender<UsedBufferToken>,
) {
fn fill_queue(vq: &mut dyn Virtq, num_packets: u16, packet_size: u32) {
for _ in 0..num_packets {
let buff_tkn = match AvailBufferToken::new(
vec![],
Expand All @@ -167,12 +133,7 @@ fn fill_queue(
// BufferTokens are directly provided to the queue
// TransferTokens are directly dispatched
// Transfers will be awaited at the queue
match vq.dispatch(
buff_tkn,
Some(poll_sender.clone()),
false,
BufferType::Direct,
) {
match vq.dispatch(buff_tkn, false, BufferType::Direct) {
Ok(_) => (),
Err(err) => {
error!("{:#?}", err);
Expand Down Expand Up @@ -220,7 +181,9 @@ impl TxQueues {

fn poll(&mut self) {
for vq in &mut self.vqs {
vq.poll();
// We don't do anything with the buffers but we need to receive them for the
// ring slots to be emptied and the memory from the previous transfers to be freed.
while vq.try_recv().is_ok() {}
}
}

Expand Down Expand Up @@ -339,7 +302,7 @@ impl NetworkDriver for VirtioNetDriver {
.unwrap();

self.send_vqs.vqs[0]
.dispatch(buff_tkn, None, false, BufferType::Direct)
.dispatch(buff_tkn, false, BufferType::Direct)
.unwrap();

result
Expand Down Expand Up @@ -373,7 +336,6 @@ impl NetworkDriver for VirtioNetDriver {
self.recv_vqs.vqs[0].as_mut(),
num_buffers,
self.recv_vqs.packet_size,
self.recv_vqs.poll_sender.clone(),
);

let vec_data = packets.into_iter().flatten().collect();
Expand Down
38 changes: 13 additions & 25 deletions src/drivers/virtio/virtqueue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use core::any::Any;
use core::mem::MaybeUninit;
use core::{mem, ptr};

use async_channel::TryRecvError;
use virtio::{le32, le64, pvirtq, virtq};

use self::error::VirtqError;
Expand Down Expand Up @@ -88,8 +87,6 @@ impl From<VqSize> for u16 {
}
}

type UsedBufferTokenSender = async_channel::Sender<UsedBufferToken>;

// Public interface of Virtq

/// The Virtq trait unifies access to the two different Virtqueue types
Expand All @@ -106,7 +103,6 @@ pub trait Virtq {
fn dispatch(
&mut self,
tkn: AvailBufferToken,
sender: Option<UsedBufferTokenSender>,
notif: bool,
buffer_type: BufferType,
) -> Result<(), VirtqError>;
Expand All @@ -125,21 +121,20 @@ pub trait Virtq {
tkn: AvailBufferToken,
buffer_type: BufferType,
) -> Result<UsedBufferToken, VirtqError> {
let (sender, receiver) = async_channel::bounded(1);
self.dispatch(tkn, Some(sender), false, buffer_type)?;
self.dispatch(tkn, false, buffer_type)?;

self.disable_notifs();

let result: UsedBufferToken;
// Keep Spinning until the receive queue is filled
loop {
match receiver.try_recv() {
Ok(buffer_tkn) => {
result = buffer_tkn;
break;
}
Err(TryRecvError::Closed) => return Err(VirtqError::General),
Err(TryRecvError::Empty) => self.poll(),
// TODO: normally, we should check if the used buffer in question is the one
// we just made available. However, this shouldn't be a problem as the queue this
// function is called on makes use of this blocking dispatch function exclusively
// and thus dispatches cannot be interleaved.
if let Ok(buffer_tkn) = self.try_recv() {
result = buffer_tkn;
break;
}
}

Expand All @@ -156,10 +151,7 @@ pub trait Virtq {

/// Checks if new used descriptors have been written by the device.
/// This activates the queue and polls the descriptor ring of the queue.
///
/// * `TransferTokens` which hold an `await_queue` will be placed into
/// these queues.
fn poll(&mut self);
fn try_recv(&mut self) -> Result<UsedBufferToken, VirtqError>;

/// Dispatches a batch of [AvailBufferToken]s. The buffers are provided to the queue in
/// sequence. After the last buffer has been written, the queue marks the first buffer as available and triggers
Expand Down Expand Up @@ -189,7 +181,6 @@ pub trait Virtq {
fn dispatch_batch_await(
&mut self,
tkns: Vec<(AvailBufferToken, BufferType)>,
await_queue: UsedBufferTokenSender,
notif: bool,
) -> Result<(), VirtqError>;

Expand Down Expand Up @@ -242,7 +233,6 @@ trait VirtqPrivate {
/// After this call, the buffers are no longer writable.
fn transfer_token_from_buffer_token(
buff_tkn: AvailBufferToken,
await_queue: Option<UsedBufferTokenSender>,
buffer_type: BufferType,
) -> TransferToken<Self::Descriptor> {
let ctrl_desc = match buffer_type {
Expand All @@ -252,7 +242,6 @@ trait VirtqPrivate {

TransferToken {
buff_tkn,
await_queue,
ctrl_desc,
}
}
Expand Down Expand Up @@ -334,11 +323,6 @@ pub struct TransferToken<Descriptor> {
/// Must be some in order to prevent drop
/// upon reuse.
buff_tkn: AvailBufferToken,
/// Structure which allows to await Transfers
/// If Some, finished TransferTokens will be placed here
/// as finished `Transfers`. If None, only the state
/// of the Token will be changed.
await_queue: Option<UsedBufferTokenSender>,
// Contains the [MemDescr] for the indirect table if the transfer is indirect.
ctrl_desc: Option<Box<[Descriptor]>>,
}
Expand Down Expand Up @@ -616,6 +600,7 @@ pub mod error {
FeatureNotSupported(virtio::F),
AllocationError,
IncompleteWrite,
NoNewUsed,
}

impl core::fmt::Debug for VirtqError {
Expand Down Expand Up @@ -645,6 +630,9 @@ pub mod error {
VirtqError::IncompleteWrite => {
write!(f, "A sized object was partially initialized.")
}
VirtqError::NoNewUsed => {
write!(f, "The queue does not contain any new used buffers.")
}
}
}
}
Expand Down
39 changes: 13 additions & 26 deletions src/drivers/virtio/virtqueue/packed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use super::super::transport::mmio::{ComCfg, NotifCfg, NotifCtrl};
use super::super::transport::pci::{ComCfg, NotifCfg, NotifCtrl};
use super::error::VirtqError;
use super::{
AvailBufferToken, BufferType, MemDescrId, MemPool, TransferToken, UsedBufferToken,
UsedBufferTokenSender, Virtq, VirtqPrivate, VqIndex, VqSize,
AvailBufferToken, BufferType, MemDescrId, MemPool, TransferToken, UsedBufferToken, Virtq,
VirtqPrivate, VqIndex, VqSize,
};
use crate::arch::mm::paging::{BasePageSize, PageSize};
use crate::arch::mm::{paging, VirtAddr};
Expand Down Expand Up @@ -128,21 +128,14 @@ impl DescriptorRing {
}

/// Polls poll index and sets the state of any finished TransferTokens.
/// If [TransferToken::await_queue] is available, the [UsedBufferToken] will be moved to the queue.
fn poll(&mut self) {
fn try_recv(&mut self) -> Result<UsedBufferToken, VirtqError> {
let mut ctrl = self.get_read_ctrler();

if let Some((mut tkn, written_len)) = ctrl.poll_next() {
if let Some(queue) = tkn.await_queue.take() {
// Place the TransferToken in a Transfer, which will hold ownership of the token
queue
.try_send(UsedBufferToken::from_avail_buffer_token(
tkn.buff_tkn,
written_len,
))
.unwrap();
}
}
ctrl.poll_next()
.map(|(tkn, written_len)| {
UsedBufferToken::from_avail_buffer_token(tkn.buff_tkn, written_len)
})
.ok_or(VirtqError::NoNewUsed)
}

fn push_batch(
Expand Down Expand Up @@ -539,8 +532,8 @@ impl Virtq for PackedVq {
self.drv_event.disable_notif();
}

fn poll(&mut self) {
self.descr_ring.poll();
fn try_recv(&mut self) -> Result<UsedBufferToken, VirtqError> {
self.descr_ring.try_recv()
}

fn dispatch_batch(
Expand All @@ -552,7 +545,7 @@ impl Virtq for PackedVq {
assert!(!buffer_tkns.is_empty());

let transfer_tkns = buffer_tkns.into_iter().map(|(buffer_tkn, buffer_type)| {
Self::transfer_token_from_buffer_token(buffer_tkn, None, buffer_type)
Self::transfer_token_from_buffer_token(buffer_tkn, buffer_type)
});

let next_idx = self.descr_ring.push_batch(transfer_tkns)?;
Expand Down Expand Up @@ -581,18 +574,13 @@ impl Virtq for PackedVq {
fn dispatch_batch_await(
&mut self,
buffer_tkns: Vec<(AvailBufferToken, BufferType)>,
await_queue: super::UsedBufferTokenSender,
notif: bool,
) -> Result<(), VirtqError> {
// Zero transfers are not allowed
assert!(!buffer_tkns.is_empty());

let transfer_tkns = buffer_tkns.into_iter().map(|(buffer_tkn, buffer_type)| {
Self::transfer_token_from_buffer_token(
buffer_tkn,
Some(await_queue.clone()),
buffer_type,
)
Self::transfer_token_from_buffer_token(buffer_tkn, buffer_type)
});

let next_idx = self.descr_ring.push_batch(transfer_tkns)?;
Expand Down Expand Up @@ -621,11 +609,10 @@ impl Virtq for PackedVq {
fn dispatch(
&mut self,
buffer_tkn: AvailBufferToken,
sender: Option<UsedBufferTokenSender>,
notif: bool,
buffer_type: BufferType,
) -> Result<(), VirtqError> {
let transfer_tkn = Self::transfer_token_from_buffer_token(buffer_tkn, sender, buffer_type);
let transfer_tkn = Self::transfer_token_from_buffer_token(buffer_tkn, buffer_type);
let next_idx = self.descr_ring.push(transfer_tkn)?;

if notif {
Expand Down
Loading