Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 28 additions & 30 deletions kernel/src/driver/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,30 +203,38 @@ impl IfaceCommon {
D: smoltcp::phy::Device + ?Sized,
{
let timestamp = crate::time::Instant::now().into();
let mut sockets = self.sockets.lock_irqsave();
let mut interface = self.smol_iface.lock_irqsave();

let (has_events, poll_at) = {
(
matches!(
interface.poll(timestamp, device, &mut sockets),
smoltcp::iface::PollResult::SocketStateChanged
),
loop {
let poll_at = interface.poll_at(timestamp, &sockets);
let Some(instant) = poll_at else {
break poll_at;
};
if instant > timestamp {
break poll_at;
}
},
)
// 逐包处理并通知
loop {
let mut sockets = self.sockets.lock_irqsave();
let result = interface.poll_ingress_single(timestamp, device, &mut sockets);
drop(sockets);
match result {
smoltcp::iface::PollIngressSingleResult::None => break,
smoltcp::iface::PollIngressSingleResult::PacketProcessed => {},
smoltcp::iface::PollIngressSingleResult::SocketStateChanged => {
self.bounds.read_irqsave().iter().for_each(|bound_socket| {
bound_socket.notify();
let _woke = bound_socket
.wait_queue()
.wakeup(Some(ProcessState::Blocked(true)));
});
}
}
Comment on lines +210 to +224
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

能说说你这里为什么要逐包处理吗
之前的写法可以一次性处理多个包,然后唤醒bound_sockets,然后可以同时唤醒多个在等待数据的socket
但是现在你这样的写法,每次有一个包的到来,都会去唤醒整个bound_sockets,但是最终只有一个socket真正收到数据,会不会有点不太必要

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

之前的写法能够一次处理多个包,关键在于提前bind backlog个数量的socket,这个pr希望实现类似于linux动态扩展wait_queue的机制。考虑过两种解决方案:

  1. 逐包处理,将处理粒度变小,让上层TCP_Socket可以及时扩充队列大小以供新的连接
  2. 在smoltcp层面,提供一个listen接口,并维护一个wait_queue来根据连接数量动态扩展
    方案2相对优雅,且可以对smoltcp的分发机制进行优化,例如用hashmap来维护sockets set,但需要对smoltcp进行修改。
    是否有更好的方案,或者是否有准备对smoltcp进行优化

}
let mut sockets = self.sockets.lock_irqsave();
let _ = interface.poll_egress(timestamp, device, &mut sockets);
let poll_at = loop {
let poll_at = interface.poll_at(timestamp, &sockets);
let Some(instant) = poll_at else {
break poll_at;
};
if instant > timestamp {
break poll_at;
}
};

// drop sockets here to avoid deadlock
drop(interface);
drop(sockets);

use core::sync::atomic::Ordering;
if let Some(instant) = poll_at {
Expand All @@ -242,16 +250,6 @@ impl IfaceCommon {
self.poll_at_ms.store(0, Ordering::Relaxed);
}

self.bounds.read_irqsave().iter().for_each(|bound_socket| {
// incase our inet socket missed the event, we manually notify it each time we poll
if has_events {
bound_socket.notify();
let _woke = bound_socket
.wait_queue()
.wakeup(Some(ProcessState::Blocked(true)));
}
});

// TODO: remove closed sockets
// let closed_sockets = self
// .closing_sockets
Expand Down
152 changes: 85 additions & 67 deletions kernel/src/net/socket/inet/stream/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use crate::filesystem::epoll::EPollEventType;
use crate::libs::rwlock::RwLock;
use crate::net::socket::{self, inet::Types};
use alloc::boxed::Box;
use alloc::vec::Vec;
use alloc::sync::Arc;
use alloc::collections::VecDeque;
use smoltcp;
use smoltcp::socket::tcp;
use system_error::SystemError;
use log::debug;

// pub const DEFAULT_METADATA_BUF_SIZE: usize = 1024;
pub const DEFAULT_RX_BUF_SIZE: usize = 128 * 1024;
Expand Down Expand Up @@ -134,25 +136,7 @@ impl Init {
smoltcp::wire::IpListenEndpoint::from(local)
};
log::debug!("listen at {:?}", listen_addr);
let mut inners = Vec::new();
if let Err(err) = || -> Result<(), SystemError> {
for _ in 0..(backlog - 1) {
// -1 because the first one is already bound
let new_listen = socket::inet::BoundInner::bind(
new_listen_smoltcp_socket(listen_addr),
listen_addr
.addr
.as_ref()
.unwrap_or(&smoltcp::wire::IpAddress::from(
smoltcp::wire::Ipv4Address::UNSPECIFIED,
)),
)?;
inners.push(new_listen);
}
Ok(())
}() {
return Err((Init::Bound((inner, local)), err));
}


if let Err(err) = inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
socket
Expand All @@ -162,11 +146,12 @@ impl Init {
return Err((Init::Bound((inner, local)), err));
}

inners.push(inner);
let mut inners = VecDeque::new();
inners.push_back(Arc::new(inner));
return Ok(Listening {
inners,
connect: AtomicUsize::new(0),
inners: RwLock::new(inners),
listen_addr,
backlog,
});
}

Expand Down Expand Up @@ -289,55 +274,78 @@ impl Connecting {

#[derive(Debug)]
pub struct Listening {
inners: Vec<socket::inet::BoundInner>,
connect: AtomicUsize,
inners: RwLock<VecDeque<Arc<socket::inet::BoundInner>>>,
backlog: usize,
listen_addr: smoltcp::wire::IpListenEndpoint,
}

impl Listening {
pub fn accept(&mut self) -> Result<(Established, smoltcp::wire::IpEndpoint), SystemError> {
let connected: &mut socket::inet::BoundInner = self
.inners
.get_mut(self.connect.load(core::sync::atomic::Ordering::Relaxed))
.unwrap();
let inners_guard = self.inners.read();
let inner = match inners_guard.front() {
Some(v) => v,
None => return Err(SystemError::EAGAIN_OR_EWOULDBLOCK),
};

if connected.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| !socket.is_active()) {
if inner.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| !socket.is_active()) {
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}
drop(inners_guard);

let mut inners_writer = self.inners.write();
let connected = match inners_writer.pop_front() {
Some(v) => v,
None => return Err(SystemError::EAGAIN_OR_EWOULDBLOCK),
};

if inners_writer.len() == 0 {
let new_listen = match socket::inet::BoundInner::bind(
new_listen_smoltcp_socket(self.listen_addr),
self.listen_addr
.addr
.as_ref()
.unwrap_or(&smoltcp::wire::IpAddress::from(
smoltcp::wire::Ipv4Address::UNSPECIFIED,
)),
) {
Ok(inner) => inner,
Err(_) => return Err(SystemError::EAGAIN_OR_EWOULDBLOCK),
};
inners_writer.push_back(Arc::new(new_listen));
}

let remote_endpoint = connected.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
socket
.remote_endpoint()
.expect("A Connected Tcp With No Remote Endpoint")
});

// log::debug!("local at {:?}", local_endpoint);

let mut new_listen = socket::inet::BoundInner::bind(
new_listen_smoltcp_socket(self.listen_addr),
self.listen_addr
.addr
.as_ref()
.unwrap_or(&smoltcp::wire::IpAddress::from(
smoltcp::wire::Ipv4Address::UNSPECIFIED,
)),
)?;

// swap the connected socket with the new_listen socket
// TODO is smoltcp socket swappable?
core::mem::swap(&mut new_listen, connected);

return Ok((Established { inner: new_listen }, remote_endpoint));
return Ok((Established { inner: Arc::try_unwrap(connected).unwrap() }, remote_endpoint));
}

pub fn update_io_events(&self, pollee: &AtomicUsize) {
let position = self.inners.iter().position(|inner| {
inner.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.is_active())
});
let mut inners_guard = self.inners.write();
let inner = match inners_guard.back() {
Some(inner) => inner,
None => return debug!("the tcp socket inners is empty"),
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using debug! macro in a return statement is unconventional. Consider using a separate debug! call followed by return, or return an appropriate value/error.

Suggested change
None => return debug!("the tcp socket inners is empty"),
None => {
debug!("the tcp socket inners is empty");
return;
}

Copilot uses AI. Check for mistakes.
};

if let Some(position) = position {
self.connect
.store(position, core::sync::atomic::Ordering::Relaxed);
if inner.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.is_active()) {
if inners_guard.len() < self.backlog {
let new_listen = match socket::inet::BoundInner::bind(
new_listen_smoltcp_socket(self.listen_addr),
self.listen_addr
.addr
.as_ref()
.unwrap_or(&smoltcp::wire::IpAddress::from(
smoltcp::wire::Ipv4Address::UNSPECIFIED,
)),
) {
Ok(inner) => inner,
Err(e) => return debug!("bind err: {:#?}", e),
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using debug! macro in a return statement is unconventional. Consider using a separate debug! call followed by return, or return an appropriate value/error.

Copilot uses AI. Check for mistakes.
};
inners_guard.push_back(Arc::new(new_listen));
}
pollee.fetch_or(
EPollEventType::EPOLL_LISTEN_CAN_ACCEPT.bits() as usize,
core::sync::atomic::Ordering::Relaxed,
Expand All @@ -364,18 +372,16 @@ impl Listening {
pub fn close(&self) {
// log::debug!("Close Listening Socket");
let port = self.get_name().port;
for inner in self.inners.iter() {
let inners_guard = self.inners.read();
for inner in inners_guard.iter() {
inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.close());
inner.port_manager().unbind_port(Types::Tcp, port);
}
self.inners[0]
.iface()
.port_manager()
.unbind_port(Types::Tcp, port);
}

pub fn release(&self) {
// log::debug!("Release Listening Socket");
for inner in self.inners.iter() {
for inner in self.inners.read().iter() {
inner.release();
}
}
Expand Down Expand Up @@ -489,8 +495,14 @@ impl Inner {
Inner::Init(_) => DEFAULT_TX_BUF_SIZE,
Inner::Connecting(conn) => conn.with_mut(|socket| socket.send_capacity()),
// only the first socket in the list is used for sending
Inner::Listening(listen) => listen.inners[0]
.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.send_capacity()),
Inner::Listening(listen) => {
listen
.inners
.read()
.front()
.unwrap()
.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.send_capacity())
}
Inner::Established(est) => est.with_mut(|socket| socket.send_capacity()),
}
}
Expand All @@ -500,18 +512,24 @@ impl Inner {
Inner::Init(_) => DEFAULT_RX_BUF_SIZE,
Inner::Connecting(conn) => conn.with_mut(|socket| socket.recv_capacity()),
// only the first socket in the list is used for receiving
Inner::Listening(listen) => listen.inners[0]
.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.recv_capacity()),
Inner::Listening(listen) => {
listen
.inners
.read()
.front()
.unwrap()
.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.recv_capacity())
}
Inner::Established(est) => est.with_mut(|socket| socket.recv_capacity()),
}
}

pub fn iface(&self) -> Option<&alloc::sync::Arc<dyn crate::driver::net::Iface>> {
pub fn iface(&self) -> Option<alloc::sync::Arc<dyn crate::driver::net::Iface>> {
match self {
Inner::Init(_) => None,
Inner::Connecting(conn) => Some(conn.inner.iface()),
Inner::Listening(listen) => Some(listen.inners[0].iface()),
Inner::Established(est) => Some(est.inner.iface()),
Inner::Connecting(conn) => Some(conn.inner.iface().clone()),
Inner::Listening(listen) => Some(listen.inners.read().front().unwrap().iface().clone()),
Inner::Established(est) => Some(est.inner.iface().clone()),
Comment on lines +531 to +532
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接unwarp吗,加个判断是否为空吧,为空的话就返回None

}
}
}
Loading
Loading