From e91833a4ba60fff81972ed477c533580ed6552cb Mon Sep 17 00:00:00 2001 From: Eemeli Date: Tue, 21 Nov 2023 22:04:00 +0200 Subject: [PATCH] socket bind, listen, accept and connect syscalls --- crates/driver-acpi/src/hpet.rs | 4 + crates/driver-framebuffer/src/lib.rs | 4 + crates/driver-rtc/src/lib.rs | 4 + crates/kernel/src/syscall.rs | 200 ++++++++++++++++++++++++--- crates/sample-elf/src/io.rs | 6 + crates/sample-elf/src/main.rs | 43 ++++-- crates/scheduler/src/ipc/pipe.rs | 159 ++++++++++++++------- crates/syscall/src/err.rs | 2 + crates/syscall/src/lib.rs | 21 +++ crates/syscall/src/net.rs | 5 + crates/vfs/src/device.rs | 7 + crates/vfs/src/ramdisk.rs | 4 + 12 files changed, 382 insertions(+), 77 deletions(-) diff --git a/crates/driver-acpi/src/hpet.rs b/crates/driver-acpi/src/hpet.rs index 32f5fdc36..8214be282 100644 --- a/crates/driver-acpi/src/hpet.rs +++ b/crates/driver-acpi/src/hpet.rs @@ -423,6 +423,10 @@ pub struct HpetDevice; // impl FileDevice for HpetDevice { + fn as_any(&self) -> &dyn core::any::Any { + self + } + fn len(&self) -> usize { core::mem::size_of::() } diff --git a/crates/driver-framebuffer/src/lib.rs b/crates/driver-framebuffer/src/lib.rs index dc0b76153..a88a33739 100644 --- a/crates/driver-framebuffer/src/lib.rs +++ b/crates/driver-framebuffer/src/lib.rs @@ -12,6 +12,10 @@ pub struct FboDevice; // impl FileDevice for FboDevice { + fn as_any(&self) -> &dyn core::any::Any { + self + } + fn len(&self) -> usize { Self::with(|fbo| fbo.len()) } diff --git a/crates/driver-rtc/src/lib.rs b/crates/driver-rtc/src/lib.rs index 414f7e475..e41f4a734 100644 --- a/crates/driver-rtc/src/lib.rs +++ b/crates/driver-rtc/src/lib.rs @@ -116,6 +116,10 @@ impl Rtc { pub struct RtcDevice; impl FileDevice for RtcDevice { + fn as_any(&self) -> &dyn core::any::Any { + self + } + fn len(&self) -> usize { mem::size_of::() } diff --git a/crates/kernel/src/syscall.rs b/crates/kernel/src/syscall.rs index 3910a40b8..bdcf552df 100644 --- a/crates/kernel/src/syscall.rs +++ b/crates/kernel/src/syscall.rs @@ -1,4 +1,4 @@ -use alloc::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc, vec::Vec}; +use alloc::{boxed::Box, string::ToString, sync::Arc, vec::Vec}; use core::{ any::{type_name_of_val, Any}, sync::atomic::Ordering, @@ -14,7 +14,7 @@ use hyperion_mem::{ vmm::PageMapImpl, }; use hyperion_scheduler::{ - ipc::pipe::Pipe, + ipc::pipe::{Channel, Pipe}, lock::{Futex, Mutex}, process, task::{Process, ProcessExt}, @@ -23,7 +23,7 @@ use hyperion_syscall::{ err::{Error, Result}, fs::FileOpenFlags, id, - net::{Protocol, SocketDomain, SocketType}, + net::{Protocol, SocketDesc, SocketDomain, SocketType}, }; use hyperion_vfs::{ device::FileDevice, @@ -59,6 +59,9 @@ pub fn syscall(args: &mut SyscallRegs) { id::SOCKET => call_id(socket, args), id::BIND => call_id(bind, args), + id::LISTEN => call_id(listen, args), + id::ACCEPT => call_id(accept, args), + id::CONNECT => call_id(connect, args), _ => { debug!("invalid syscall"); @@ -432,6 +435,10 @@ fn socket(args: &mut SyscallRegs) -> Result { let ty = SocketType(args.arg1 as _); let proto = Protocol(args.arg2 as _); + _socket(domain, ty, proto).map(|fd| fd.0) +} + +fn _socket(domain: SocketDomain, ty: SocketType, proto: Protocol) -> Result { if domain != SocketDomain::LOCAL { return Err(Error::INVALID_DOMAIN); } @@ -444,11 +451,21 @@ fn socket(args: &mut SyscallRegs) -> Result { return Err(Error::UNKNOWN_PROTOCOL); } + Ok(_socket_from(SocketFile { + domain, + ty, + proto, + conn: None, + pipe: None, + })) +} + +fn _socket_from(socket: SocketFile) -> SocketDesc { let this = process(); let ext = process_ext_with(&this); let socket = Some(Socket { - socket_ref: Arc::new(Mutex::new(SocketFile {})), + socket_ref: Arc::new(Mutex::new(socket)), }); let mut sockets = ext.sockets.lock(); @@ -466,17 +483,22 @@ fn socket(args: &mut SyscallRegs) -> Result { sockets.push(socket); } - return Ok(fd); + return SocketDesc(fd); } /// bind a socket /// /// [`hyperion_syscall::bind`] fn bind(args: &mut SyscallRegs) -> Result { - let socket = args.arg0 as usize; - let path = read_untrusted_str(args.arg1, args.arg2)?; + let socket = SocketDesc(args.arg0 as _); + let addr = read_untrusted_str(args.arg1, args.arg2)?; - let path = Path::from_str(path); + _bind(socket, addr).map(|_| 0) +} + +fn _bind(socket: SocketDesc, addr: &str) -> Result<()> { + // TODO: this is only for LOCAL domain sockets atm + let path = Path::from_str(addr); let Some((dir, sock_file)) = path.split() else { return Err(Error::NOT_FOUND); }; @@ -486,11 +508,12 @@ fn bind(args: &mut SyscallRegs) -> Result { let sockets = ext.sockets.lock(); let socket_file = sockets - .get(socket) + .get(socket.0) .and_then(|s| s.as_ref()) .ok_or(Error::BAD_FILE_DESCRIPTOR)? .socket_ref .clone(); + drop(sockets); let dir = VFS_ROOT .find_dir(dir, false) @@ -500,9 +523,134 @@ fn bind(args: &mut SyscallRegs) -> Result { .create_node(sock_file, Node::File(socket_file)) .map_err(map_vfs_err_to_syscall_err)?; - Ok(0) + return Ok(()); +} + +/// start listening to connections on a socket +/// +/// [`hyperion_syscall::listen`] +fn listen(args: &mut SyscallRegs) -> Result { + let socket = SocketDesc(args.arg0 as _); + _listen(socket).map(|_| 0) +} + +fn _listen(socket: SocketDesc) -> Result<()> { + let this = process(); + let ext = process_ext_with(&this); + + ext.sockets + .lock() + .get(socket.0) + .and_then(|s| s.as_ref()) + .ok_or(Error::BAD_FILE_DESCRIPTOR)? + .socket_ref + .lock() + .conn = Some(Arc::new(Channel::new())); + + Ok(()) +} + +/// accept a connection on a socket +/// +/// [`hyperion_syscall::accept`] +fn accept(args: &mut SyscallRegs) -> Result { + let socket = SocketDesc(args.arg0 as _); + + _accept(socket).map(|fd| fd.0) } +fn _accept(socket: SocketDesc) -> Result { + let this = process(); + let ext = process_ext_with(&this); + + let sockets = ext.sockets.lock(); + let socket = sockets + .get(socket.0) + .and_then(|s| s.as_ref()) + .ok_or(Error::BAD_FILE_DESCRIPTOR)? + .socket_ref + .clone(); + drop(sockets); + + let mut socket = socket.lock(); + + let domain = socket.domain; + let ty = socket.ty; + let proto = socket.proto; + + // `listen` syscall is not required + let conn = socket + .conn + .get_or_insert_with(|| Arc::new(Channel::new())) + .clone(); + + drop(socket); + + // blocks here + let pipe = conn.recv(); + + Ok(_socket_from(SocketFile { + domain, + ty, + proto, + conn: None, + pipe: Some(pipe), + })) +} + +/// connect to a socket +/// +/// [`hyperion_syscall::connect`] +fn connect(args: &mut SyscallRegs) -> Result { + let socket = SocketDesc(args.arg0 as _); + let addr = read_untrusted_str(args.arg1, args.arg2)?; + + _connect(socket, addr).map(|_| 0) +} + +fn _connect(socket: SocketDesc, addr: &str) -> Result<()> { + let this = process(); + let ext = process_ext_with(&this); + + let sockets = ext.sockets.lock(); + let client = sockets + .get(socket.0) + .and_then(|s| s.as_ref()) + .ok_or(Error::BAD_FILE_DESCRIPTOR)? + .socket_ref + .clone(); + drop(sockets); + + let server = VFS_ROOT + .find_file(addr, false, false) + .map_err(map_vfs_err_to_syscall_err)?; + let server = server.lock(); + + // TODO: inode + let conn = server + .as_any() + .downcast_ref::() + .ok_or(Error::CONNECTION_REFUSED)? + .conn + .as_ref() + .cloned(); // not a socket file + + let Some(conn) = conn else { + return Err(Error::CONNECTION_REFUSED); + }; + + drop(server); + + let pipe = Arc::new(Pipe::new()); + conn.send(pipe.clone()); + + client.lock().pipe = Some(pipe); + + Ok(()) +} + +// + struct ProcessExtra { files: Mutex>>, sockets: Mutex>>, @@ -519,19 +667,39 @@ struct Socket { type SocketRef = Arc>; -struct SocketFile {} +struct SocketFile { + domain: SocketDomain, + ty: SocketType, + proto: Protocol, + + conn: Option>>>, + pipe: Option>, +} impl FileDevice for SocketFile { + fn as_any(&self) -> &dyn Any { + self + } + fn len(&self) -> usize { - 0 + if let Some(pipe) = self.pipe.as_ref() { + let recv = pipe.n_recv.load(Ordering::SeqCst); + let send = pipe.n_send.load(Ordering::SeqCst); + send - recv + } else { + 0 + } } - fn read(&self, offset: usize, buf: &mut [u8]) -> IoResult { - Err(IoError::PermissionDenied) + fn read(&self, _offset: usize, buf: &mut [u8]) -> IoResult { + let pipe = self.pipe.as_ref().ok_or(IoError::PermissionDenied)?; + Ok(pipe.recv_slice(buf)) } - fn write(&mut self, offset: usize, buf: &[u8]) -> IoResult { - Err(IoError::PermissionDenied) + fn write(&mut self, _offset: usize, buf: &[u8]) -> IoResult { + let pipe = self.pipe.as_ref().ok_or(IoError::PermissionDenied)?; + pipe.send_slice(buf); + Ok(buf.len()) } } diff --git a/crates/sample-elf/src/io.rs b/crates/sample-elf/src/io.rs index 0b414459b..df8319baa 100644 --- a/crates/sample-elf/src/io.rs +++ b/crates/sample-elf/src/io.rs @@ -15,6 +15,7 @@ impl Read for SimpleIpcInputChannel { } } +#[allow(dead_code)] pub struct BufReader { buf: [u8; 64], end: u8, @@ -22,6 +23,7 @@ pub struct BufReader { } impl BufReader { + #[allow(dead_code)] pub fn new(read: T) -> Self { Self { buf: [0; 64], @@ -30,10 +32,12 @@ impl BufReader { } } + #[allow(dead_code)] pub fn read_line(&mut self, buf: &mut String) -> Result { unsafe { append_to_string(buf, |b| read_until(self, b'\n', b)) } } + #[allow(dead_code)] fn fill_buf(&mut self) -> Result<&[u8], String> { let bytes_read = self.inner.recv(&mut self.buf[self.end as usize..])?; self.end += bytes_read as u8; @@ -42,12 +46,14 @@ impl BufReader { Ok(&self.buf[..self.end as usize]) } + #[allow(dead_code)] fn consume(&mut self, used: usize) { self.buf.rotate_left(used); self.end -= used as u8; } } +#[allow(dead_code)] fn read_until( r: &mut BufReader, delim: u8, diff --git a/crates/sample-elf/src/main.rs b/crates/sample-elf/src/main.rs index bab1351f8..74539e857 100644 --- a/crates/sample-elf/src/main.rs +++ b/crates/sample-elf/src/main.rs @@ -6,31 +6,27 @@ extern crate alloc; -use alloc::{boxed::Box, format, string::String, sync::Arc}; +use alloc::boxed::Box; use core::{ alloc::GlobalAlloc, fmt::{self, Write}, ptr::NonNull, - sync::atomic::{AtomicUsize, Ordering}, }; use hyperion_syscall::{ - fs::{File, OpenOptions}, net::{Protocol, SocketDomain, SocketType}, *, }; -use crate::io::{BufReader, SimpleIpcInputChannel}; - // mod io; // partial std::io // -pub fn main(args: CliArgs) { - println!("sample app main"); - println!("args: {args:?}"); +pub fn main(_args: CliArgs) { + // println!("sample app main"); + // println!("args: {args:?}"); // for i in 1..13 { // if i == 2 || i == 8 { @@ -39,12 +35,33 @@ pub fn main(args: CliArgs) { // println!("{:?}", unsafe { syscall_0(i) }); // } - let sock = - hyperion_syscall::socket(SocketDomain::LOCAL, SocketType::STREAM, Protocol::LOCAL).unwrap(); + spawn(|| { + let server = socket(SocketDomain::LOCAL, SocketType::STREAM, Protocol::LOCAL).unwrap(); + + bind(server, "/dev/server.sock").unwrap(); + + let _conn = accept(server).unwrap(); + + println!("connected"); + + // TODO: send(conn, b"Hello").unwrap(); + }); - hyperion_syscall::bind(sock, "/dev/server.sock").unwrap(); + // wait for the server to be up + nanosleep(50_000_000); - match args.iter().next().expect("arg0 to be present") { + let client = socket(SocketDomain::LOCAL, SocketType::STREAM, Protocol::LOCAL).unwrap(); + + connect(client, "/dev/server.sock").unwrap(); + + println!("connected"); + + // TODO: + // let mut buf = [0u8; 64]; + // let len = recv(client, &mut buf).unwrap(); + // assert_eq!(&buf[..len], b"Hello"); + + /* match args.iter().next().expect("arg0 to be present") { // busybox style single binary 'coreutils' "/bin/run" => { let inc = Arc::new(AtomicUsize::new(0)); @@ -167,7 +184,7 @@ pub fn main(args: CliArgs) { } tool => panic!("unknown tool {tool}"), - } + } */ } // diff --git a/crates/scheduler/src/ipc/pipe.rs b/crates/scheduler/src/ipc/pipe.rs index e738fce6e..4cc5e0040 100644 --- a/crates/scheduler/src/ipc/pipe.rs +++ b/crates/scheduler/src/ipc/pipe.rs @@ -7,89 +7,152 @@ use crate::{futex, lock::Mutex, process, task::Pid}; // +pub type Pipe = Channel<0x1000, u8>; + /// simple P2P 2-copy IPC channel -pub struct Pipe { +pub struct Channel { /// the actual data channel - pub send: Mutex>, - pub recv: Mutex>, + pub send: Mutex>, + pub recv: Mutex>, - pub items: AtomicUsize, + pub n_send: AtomicUsize, + pub n_recv: AtomicUsize, } -impl Pipe { +impl Channel { pub fn new() -> Self { // TODO: custom allocator - let (send, recv) = ringbuf::HeapRb::new(0x1000).split(); + let (send, recv) = ringbuf::HeapRb::new(MAX).split(); let (send, recv) = (Mutex::new(send), Mutex::new(recv)); Self { send, recv, - items: AtomicUsize::new(0), + n_send: AtomicUsize::new(0), + n_recv: AtomicUsize::new(0), } } -} -impl Default for Pipe { - fn default() -> Self { - Self::new() - } -} + pub fn send(&self, mut item: T) { + let mut stream = self.send.lock(); + loop { + let n_recv = self.n_recv.load(Ordering::Acquire); + if let Err(overflow) = stream.push(item) { + // wake up a reader + futex::wake(NonNull::from(&self.n_send), 1); -// + // sleep with the send stream lock + futex::wait(NonNull::from(&self.n_recv), n_recv); -pub fn send(target_pid: Pid, data: &[u8]) -> Result<(), &'static str> { - if data.is_empty() { - return Ok(()); - } + // keep trying to send the item + item = overflow; + } else { + self.n_send.fetch_add(1, Ordering::Release); - let proc = target_pid.find().ok_or("no such process")?; - let pipe = &proc.simple_ipc; + // wake up a reader + futex::wake(NonNull::from(&self.n_send), 1); - let mut stream = pipe.send.lock(); - let mut data = data; - loop { - if data.is_empty() { - return Ok(()); + return; + }; } + } - let sent = stream.push_slice(data); - data = &data[sent..]; + pub fn recv(&self) -> T { + let mut stream = self.recv.lock(); + loop { + let n_send = self.n_send.load(Ordering::Acquire); + if let Some(item) = stream.pop() { + self.n_recv.fetch_add(1, Ordering::Release); - pipe.items.fetch_add(sent, Ordering::Release); + // wake up a sender + futex::wake(NonNull::from(&self.n_recv), 1); - // wake up a reader - futex::wake(NonNull::from(&pipe.items), 1); + return item; + } else { + // wake up a sender + futex::wake(NonNull::from(&self.n_recv), 1); - // sleep with the send stream lock - futex::wait(NonNull::from(&pipe.items), 0x1000); + // sleep with the recv stream lock + futex::wait(NonNull::from(&self.n_send), n_send); + } + } } } -pub fn recv(buf: &mut [u8]) -> usize { - if buf.is_empty() { - return 0; +impl Channel +where + T: Copy, +{ + pub fn send_slice(&self, data: &[T]) { + if data.is_empty() { + return; + } + + let mut stream = self.send.lock(); + let mut data = data; + loop { + let n_recv = self.n_recv.load(Ordering::Acquire); + let sent = stream.push_slice(data); + data = &data[sent..]; + + self.n_send.fetch_add(sent, Ordering::Release); + + // wake up a reader + futex::wake(NonNull::from(&self.n_send), 1); + + if data.is_empty() { + // if not full + return; + } + + // sleep with the send stream lock + futex::wait(NonNull::from(&self.n_recv), n_recv); + } } - let proc = process(); - let pipe = &proc.simple_ipc; + pub fn recv_slice(&self, buf: &mut [T]) -> usize { + if buf.is_empty() { + return 0; + } + + let mut stream = self.recv.lock(); + loop { + let n_send = self.n_send.load(Ordering::Acquire); + let count = stream.pop_slice(buf); - let mut stream = pipe.recv.lock(); - loop { - let count = stream.pop_slice(buf); + self.n_recv.fetch_add(count, Ordering::Release); - // can race and wrap from -1, but it doesn't matter - pipe.items.fetch_sub(count, Ordering::Release); + // wake up a sender + futex::wake(NonNull::from(&self.n_recv), 1); - // wake up a sender - futex::wake(NonNull::from(&pipe.items), 1); + if count != 0 { + return count; + } - if count != 0 { - return count; + // sleep with the recv stream lock + futex::wait(NonNull::from(&self.n_send), n_send); } + } +} - // sleep with the recv stream lock - futex::wait(NonNull::from(&pipe.items), 0); +impl Default for Channel { + fn default() -> Self { + Self::new() } } + +// + +pub fn send(target_pid: Pid, data: &[u8]) -> Result<(), &'static str> { + target_pid + .find() + .ok_or("no such process")? + .simple_ipc + .send_slice(data); + Ok(()) +} + +pub fn recv(buf: &mut [u8]) -> usize { + process().simple_ipc.recv_slice(buf) +} diff --git a/crates/syscall/src/err.rs b/crates/syscall/src/err.rs index 6328b5496..bdc5c9c78 100644 --- a/crates/syscall/src/err.rs +++ b/crates/syscall/src/err.rs @@ -52,6 +52,8 @@ impl_error! { pub const INVALID_TYPE: "invalid socket type" = 19; pub const UNKNOWN_PROTOCOL: "unknown protocol" = 20; + pub const CONNECTION_REFUSED: "connection refused" = 21; + pub const _: "unknown error" = _; } diff --git a/crates/syscall/src/lib.rs b/crates/syscall/src/lib.rs index f42a7e174..f3c6191f5 100644 --- a/crates/syscall/src/lib.rs +++ b/crates/syscall/src/lib.rs @@ -37,6 +37,9 @@ pub mod id { pub const SOCKET: usize = 2000; pub const BIND: usize = 2100; + pub const LISTEN: usize = 2200; + pub const ACCEPT: usize = 2300; + pub const CONNECT: usize = 2400; } // @@ -226,3 +229,21 @@ pub fn socket(domain: SocketDomain, ty: SocketType, protocol: Protocol) -> Resul pub fn bind(socket: SocketDesc, addr: &str) -> Result<()> { unsafe { syscall_3(id::BIND, socket.0, addr.as_ptr() as _, addr.len()) }.map(|_| {}) } + +/// start listening for connections on a socket +#[inline(always)] +pub fn listen(socket: SocketDesc) -> Result<()> { + unsafe { syscall_1(id::LISTEN, socket.0) }.map(|_| {}) +} + +/// accept a connection on a socket +#[inline(always)] +pub fn accept(socket: SocketDesc) -> Result { + unsafe { syscall_1(id::ACCEPT, socket.0) }.map(SocketDesc) +} + +/// connect to a socket +#[inline(always)] +pub fn connect(socket: SocketDesc, addr: &str) -> Result<()> { + unsafe { syscall_3(id::CONNECT, socket.0, addr.as_ptr() as _, addr.len()) }.map(|_| {}) +} diff --git a/crates/syscall/src/net.rs b/crates/syscall/src/net.rs index ad8d8f898..541f4fa6f 100644 --- a/crates/syscall/src/net.rs +++ b/crates/syscall/src/net.rs @@ -29,3 +29,8 @@ impl Protocol { pub const UNIX: Self = Self::LOCAL; pub const LOCAL: Self = Self(0); } + +// + +// #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +// pub struct SocketStream(pub usize); diff --git a/crates/vfs/src/device.rs b/crates/vfs/src/device.rs index 8c3b6652d..0ee5d07a8 100644 --- a/crates/vfs/src/device.rs +++ b/crates/vfs/src/device.rs @@ -1,4 +1,5 @@ use alloc::sync::Arc; +use core::any::Any; use lock_api::RawMutex; @@ -10,6 +11,8 @@ use crate::{ // pub trait FileDevice: Send + Sync { + fn as_any(&self) -> &dyn Any; + fn len(&self) -> usize; fn is_empty(&self) -> bool { @@ -68,6 +71,10 @@ pub trait DirectoryDevice: Send + Sync { // impl FileDevice for [u8] { + fn as_any(&self) -> &dyn Any { + panic!() + } + fn len(&self) -> usize { <[u8]>::len(self) } diff --git a/crates/vfs/src/ramdisk.rs b/crates/vfs/src/ramdisk.rs index 93b4ffeff..8bc95430e 100644 --- a/crates/vfs/src/ramdisk.rs +++ b/crates/vfs/src/ramdisk.rs @@ -30,6 +30,10 @@ pub struct Directory { // impl FileDevice for File { + fn as_any(&self) -> &dyn core::any::Any { + self + } + fn len(&self) -> usize { self.bytes.len() }