diff --git a/examples/tcp_echo.rs b/examples/tcp_echo.rs new file mode 100644 index 0000000..1966500 --- /dev/null +++ b/examples/tcp_echo.rs @@ -0,0 +1,20 @@ +use futures_lite::FutureExt; +use wove::Wove; + +pub async fn go(wove: &Wove) -> std::io::Result<()> { + let listener = wove::net::TcpListener::bind(wove, "localhost:3000").await?; + + todo!() +} + +pub fn main() { + let wove = Wove::new().unwrap(); + let fut = async { + let run = async { wove.run().await? }; + let go = go(&wove); + + go.race(run).await + }; + + pollster::block_on(fut).unwrap(); +} diff --git a/src/fs.rs b/src/fs.rs index 03147ee..428cb59 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,15 +1,15 @@ use std::path::PathBuf; -use crate::{aliases::IoResult, plat::Platform, Wove}; +use crate::{aliases::IoResult, plat_impl, Wove}; pub async fn read_to_string(wove: &Wove, path: PathBuf) -> IoResult { - let mut file = wove.platform.open_file(path).await?; + let mut file = plat_impl::open_file(&wove.platform, path).await?; let mut out = Vec::new(); loop { let buf = vec![0; 4096].into_boxed_slice(); - let buf = wove.platform.read(&mut file, out.len(), buf).await?; - dbg!(buf.len()); + let buf = plat_impl::read(&wove.platform, &mut file, out.len(), buf).await?; + if buf.len() == 0 { break; } diff --git a/src/lib.rs b/src/lib.rs index b1b9020..0606162 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![feature(async_closure)] +#![feature(never_type)] mod aliases; pub mod fs; @@ -6,23 +7,47 @@ pub mod net; mod plat; mod wove; +use std::{future::Future, pin::Pin, task::Poll}; + use aliases::IoResult; #[cfg(target_os = "linux")] -pub use plat::linux::PlatformLinux as PlatImpl; -use plat::Platform; +pub(crate) use plat::linux as plat_impl; + +pub struct Tock(bool); +impl Tock { + fn new() -> Self { + Self(false) + } +} + +impl Future for Tock { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.0 { + return Poll::Ready(()); + } + + let this = unsafe { self.get_unchecked_mut() }; + this.0 = true; + cx.waker().wake_by_ref(); + + Poll::Pending + } +} pub struct Wove { - platform: PlatImpl, + platform: plat_impl::Platform, } impl Wove { pub fn new() -> IoResult { Ok(Self { - platform: PlatImpl::new(Default::default())?, + platform: plat_impl::Platform::new()?, }) } - pub async fn run(&self) -> IoResult<()> { + pub async fn run(&self) -> IoResult { loop { self.platform.tick().await?; } @@ -34,6 +59,7 @@ mod test { use std::path::PathBuf; use crate::Wove; + use futures_lite::future::try_zip; #[test] fn sketch() { @@ -42,16 +68,21 @@ mod test { let fut = async { let run = async { wove.run().await?; - Ok("".to_string()) }; - let contents = crate::fs::read_to_string(wove, PathBuf::from("src/lib.rs")); - let contents = futures_lite::future::race(run, contents).await; + let hello = crate::fs::read_to_string(wove, PathBuf::from("test_data/hello.txt")); + let goodbye = + crate::fs::read_to_string(wove, PathBuf::from("test_data/goodbye.txt")); + let both = try_zip(hello, goodbye); + + let contents = futures_lite::future::race(both, run).await; contents }; - let out = pollster::block_on(fut).unwrap(); - assert_eq!(out, ""); + let (hello, goodbye) = pollster::block_on(fut).unwrap(); + + assert_eq!(hello, "hello!\n"); + assert_eq!(goodbye, "goodbye!\n"); } } diff --git a/src/net.rs b/src/net.rs index 8b13789..a523656 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1 +1,25 @@ +use std::net::ToSocketAddrs; +use crate::{aliases::IoResult, plat_impl, Wove}; + +pub struct TcpListener(plat_impl::FileHandle); + +impl TcpListener { + pub async fn bind(wove: &Wove, addrs: impl ToSocketAddrs) -> IoResult { + // TODO(Blocking): to_socket_addrs can block + + let mut last_err = None; + for addr in addrs.to_socket_addrs()? { + match plat_impl::open_tcp_socket(&wove.platform, addr).await { + Ok(v) => return Ok(TcpListener(v)), + Err(e) => last_err = Some(e), + } + } + + if let Some(last_err) = last_err { + return Err(last_err); + } + + Err(std::io::Error::other("No addrs returned")) + } +} diff --git a/src/plat/linux.rs b/src/plat/linux.rs index 5348240..a32280b 100644 --- a/src/plat/linux.rs +++ b/src/plat/linux.rs @@ -1,16 +1,31 @@ -use std::{ffi::CString, future::Future, path::PathBuf, pin::Pin}; +use std::{ + ffi::CString, future::Future, net::IpAddr, net::SocketAddr, path::PathBuf, pin::Pin, +}; use io_uring::IoUring; -use crate::aliases::IoResult; - -use super::Platform; +use crate::{aliases::IoResult, Tock}; pub type CallbackFuture = Pin + 'static>>; pub type Callback = Box CallbackFuture + 'static>; -fn box_callback_as_u64(callback: impl async FnOnce(io_uring::cqueue::Entry) + 'static) -> u64 { - let bx: Callback = Box::new(move |entry| Box::pin(callback(entry))); +fn handle_error(i: i32) -> IoResult { + if i < 0 { + return Err(std::io::Error::from_raw_os_error(-i)); + } + + Ok(i) +} + +fn box_callback_as_u64( + tx: async_channel::Sender>, + callback: impl async FnOnce(io_uring::cqueue::Entry) -> IoResult + 'static, +) -> u64 { + let bx: Callback = Box::new(move |entry| { + Box::pin(async move { + tx.send(callback(entry).await).await.unwrap(); + }) + }); let bx: Box = Box::new(bx); let leaked = Box::leak(bx); let ptr = leaked as *mut _; @@ -26,39 +41,21 @@ unsafe fn u64_as_callback(v: u64) -> Box { pub struct PlatformLinux { uring: io_uring::IoUring, - tx_should_tick: async_channel::Sender<()>, - rx_should_tick: async_channel::Receiver<()>, - _pd: core::marker::PhantomData>, + _pd: core::marker::PhantomData>, } +pub type FileHandle = io_uring::types::Fd; impl PlatformLinux { - fn set_should_tick(&self) { - self.tx_should_tick.force_send(()).unwrap(); - } - - async fn wait_tick(&self) { - self.rx_should_tick.recv().await.unwrap() - } -} -impl Platform for PlatformLinux { - type NewOptions = (); - type FileHandle = io_uring::types::Fd; - - fn new(opts: Self::NewOptions) -> IoResult { - let (tx_should_tick, rx_should_tick) = async_channel::bounded(1); - + pub fn new() -> IoResult { Ok(Self { uring: IoUring::new(256)?, - tx_should_tick, - rx_should_tick, _pd: Default::default(), }) } - async fn tick(&self) -> IoResult<()> { - self.wait_tick().await; + pub async fn tick(&self) -> IoResult<()> { self.uring.submit()?; let cq = unsafe { self.uring.completion_shared() }; @@ -67,58 +64,104 @@ impl Platform for PlatformLinux { cb(entry).await; } + Tock::new().await; + Ok(()) } +} - async fn open_file(&self, path: PathBuf) -> IoResult { - let (tx, rx) = async_channel::bounded(1); +pub async fn open_file(p: &PlatformLinux, path: PathBuf) -> IoResult { + let (tx, rx) = async_channel::bounded(1); - let path = CString::new(path.into_os_string().into_encoded_bytes())?; - let entry = - io_uring::opcode::OpenAt::new(io_uring::types::Fd(libc::AT_FDCWD), path.as_ptr()) - .build() - .user_data(box_callback_as_u64(async move |entry| { - let fd = entry.result(); - let fd = io_uring::types::Fd(fd); - - tx.send(fd).await.unwrap(); - std::mem::drop(path); - })); - - unsafe { - self.uring.submission_shared().push(&entry).unwrap(); - } - - self.set_should_tick(); - Ok(rx.recv().await.unwrap()) - } - - async fn read( - &self, - f: &mut Self::FileHandle, - offset: usize, - mut buf: Box<[u8]>, - ) -> IoResult> { - let (tx, rx) = async_channel::bounded(1); - - let entry = io_uring::opcode::Read::new(*f, buf.as_mut_ptr(), buf.len() as _) - .offset(offset as _) + let path = CString::new(path.into_os_string().into_encoded_bytes())?; + let entry = + io_uring::opcode::OpenAt::new(io_uring::types::Fd(libc::AT_FDCWD), path.as_ptr()) .build() - .user_data(box_callback_as_u64(async move |entry| { - let read_amt = entry.result(); - dbg!(read_amt); + .user_data(box_callback_as_u64(tx, async move |entry| { + let fd = handle_error(entry.result())?; + let fd = io_uring::types::Fd(fd); - let mut buf = buf.into_vec(); - buf.truncate(read_amt as _); + std::mem::drop(path); - tx.send(buf.into_boxed_slice()).await.unwrap() + Ok(fd) })); - unsafe { - self.uring.submission_shared().push(&entry).unwrap(); - } - - self.set_should_tick(); - Ok(rx.recv().await.unwrap()) + unsafe { + p.uring.submission_shared().push(&entry).unwrap(); } + + rx.recv().await.unwrap() } + +pub async fn open_tcp_socket( + p: &PlatformLinux, + socket_addr: SocketAddr, +) -> IoResult { + let (tx, rx) = async_channel::bounded(1); + + let domain = match () { + _ if socket_addr.is_ipv4() => libc::AF_INET, + _ if socket_addr.is_ipv6() => libc::AF_INET6, + _ => return Err(std::io::Error::other("Unsupported domain")), + }; + + let entry = io_uring::opcode::Socket::new(domain, libc::SOCK_STREAM, 0) + .build() + .user_data(box_callback_as_u64(tx, async move |entry| { + let res = handle_error(entry.result())?; + + let sock = libc::sockaddr_in { + sin_family: domain as _, + sin_port: socket_addr.port(), + sin_addr: libc::in_addr { + s_addr: match socket_addr.ip() { + IpAddr::V4(v) => v.to_bits(), + IpAddr::V6(v) => panic!(), + }, + }, + sin_zero: Default::default(), + }; + + // FIXME(Blocking) + let fd = handle_error(unsafe { + libc::bind( + res, + &sock as *const _ as *const _, + core::mem::size_of_val(&sock) as u32, + ) + })?; + + Ok(io_uring::types::Fd(fd)) + })); + + rx.recv().await.unwrap() +} + +pub async fn read( + p: &PlatformLinux, + f: &mut FileHandle, + offset: usize, + mut buf: Box<[u8]>, +) -> IoResult> { + let (tx, rx) = async_channel::bounded(1); + + let entry = io_uring::opcode::Read::new(*f, buf.as_mut_ptr(), buf.len() as _) + .offset(offset as _) + .build() + .user_data(box_callback_as_u64(tx, async move |entry| { + let read_amt = handle_error(entry.result())?; + + let mut buf = buf.into_vec(); + buf.truncate(read_amt as _); + + Ok(buf.into_boxed_slice()) + })); + + unsafe { + p.uring.submission_shared().push(&entry).unwrap(); + } + + rx.recv().await.unwrap() +} + +pub type Platform = PlatformLinux; diff --git a/src/plat/mod.rs b/src/plat/mod.rs index 4da9482..a9ba9ab 100644 --- a/src/plat/mod.rs +++ b/src/plat/mod.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{net::SocketAddr, path::PathBuf}; use crate::aliases::IoResult; @@ -6,7 +6,6 @@ pub(crate) mod linux; pub trait Platform { type NewOptions: Default; - type FileHandle; fn new(opts: Self::NewOptions) -> IoResult where @@ -14,11 +13,5 @@ pub trait Platform { async fn tick(&self) -> IoResult<()>; - async fn open_file(&self, path: PathBuf) -> IoResult; - async fn read( - &self, - f: &mut Self::FileHandle, - offset: usize, - buf: Box<[u8]>, - ) -> IoResult>; + type FileHandle; } diff --git a/test_data/goodbye.txt b/test_data/goodbye.txt new file mode 100644 index 0000000..0508bbb --- /dev/null +++ b/test_data/goodbye.txt @@ -0,0 +1 @@ +goodbye! diff --git a/test_data/hello.txt b/test_data/hello.txt new file mode 100644 index 0000000..4effa19 --- /dev/null +++ b/test_data/hello.txt @@ -0,0 +1 @@ +hello!