More changes

This commit is contained in:
soup 2024-10-19 17:00:04 -04:00
parent f2f668331d
commit 0db4c4fa9c
No known key found for this signature in database
8 changed files with 208 additions and 95 deletions

20
examples/tcp_echo.rs Normal file
View file

@ -0,0 +1,20 @@
use futures_lite::FutureExt;
use wove::Wove;
pub async fn go(wove: &Wove) -> std::io::Result<()> {
let listener = wove::net::TcpListener::bind(wove, "localhost:3000").await?;
todo!()
}
pub fn main() {
let wove = Wove::new().unwrap();
let fut = async {
let run = async { wove.run().await? };
let go = go(&wove);
go.race(run).await
};
pollster::block_on(fut).unwrap();
}

View file

@ -1,15 +1,15 @@
use std::path::PathBuf;
use crate::{aliases::IoResult, plat::Platform, Wove};
use crate::{aliases::IoResult, plat_impl, Wove};
pub async fn read_to_string(wove: &Wove, path: PathBuf) -> IoResult<String> {
let mut file = wove.platform.open_file(path).await?;
let mut file = plat_impl::open_file(&wove.platform, path).await?;
let mut out = Vec::new();
loop {
let buf = vec![0; 4096].into_boxed_slice();
let buf = wove.platform.read(&mut file, out.len(), buf).await?;
dbg!(buf.len());
let buf = plat_impl::read(&wove.platform, &mut file, out.len(), buf).await?;
if buf.len() == 0 {
break;
}

View file

@ -1,4 +1,5 @@
#![feature(async_closure)]
#![feature(never_type)]
mod aliases;
pub mod fs;
@ -6,23 +7,47 @@ pub mod net;
mod plat;
mod wove;
use std::{future::Future, pin::Pin, task::Poll};
use aliases::IoResult;
#[cfg(target_os = "linux")]
pub use plat::linux::PlatformLinux as PlatImpl;
use plat::Platform;
pub(crate) use plat::linux as plat_impl;
pub struct Tock(bool);
impl Tock {
fn new() -> Self {
Self(false)
}
}
impl Future for Tock {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.0 {
return Poll::Ready(());
}
let this = unsafe { self.get_unchecked_mut() };
this.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
pub struct Wove {
platform: PlatImpl,
platform: plat_impl::Platform,
}
impl Wove {
pub fn new() -> IoResult<Self> {
Ok(Self {
platform: PlatImpl::new(Default::default())?,
platform: plat_impl::Platform::new()?,
})
}
pub async fn run(&self) -> IoResult<()> {
pub async fn run(&self) -> IoResult<!> {
loop {
self.platform.tick().await?;
}
@ -34,6 +59,7 @@ mod test {
use std::path::PathBuf;
use crate::Wove;
use futures_lite::future::try_zip;
#[test]
fn sketch() {
@ -42,16 +68,21 @@ mod test {
let fut = async {
let run = async {
wove.run().await?;
Ok("".to_string())
};
let contents = crate::fs::read_to_string(wove, PathBuf::from("src/lib.rs"));
let contents = futures_lite::future::race(run, contents).await;
let hello = crate::fs::read_to_string(wove, PathBuf::from("test_data/hello.txt"));
let goodbye =
crate::fs::read_to_string(wove, PathBuf::from("test_data/goodbye.txt"));
let both = try_zip(hello, goodbye);
let contents = futures_lite::future::race(both, run).await;
contents
};
let out = pollster::block_on(fut).unwrap();
assert_eq!(out, "");
let (hello, goodbye) = pollster::block_on(fut).unwrap();
assert_eq!(hello, "hello!\n");
assert_eq!(goodbye, "goodbye!\n");
}
}

View file

@ -1 +1,25 @@
use std::net::ToSocketAddrs;
use crate::{aliases::IoResult, plat_impl, Wove};
pub struct TcpListener(plat_impl::FileHandle);
impl TcpListener {
pub async fn bind(wove: &Wove, addrs: impl ToSocketAddrs) -> IoResult<Self> {
// TODO(Blocking): to_socket_addrs can block
let mut last_err = None;
for addr in addrs.to_socket_addrs()? {
match plat_impl::open_tcp_socket(&wove.platform, addr).await {
Ok(v) => return Ok(TcpListener(v)),
Err(e) => last_err = Some(e),
}
}
if let Some(last_err) = last_err {
return Err(last_err);
}
Err(std::io::Error::other("No addrs returned"))
}
}

View file

@ -1,16 +1,31 @@
use std::{ffi::CString, future::Future, path::PathBuf, pin::Pin};
use std::{
ffi::CString, future::Future, net::IpAddr, net::SocketAddr, path::PathBuf, pin::Pin,
};
use io_uring::IoUring;
use crate::aliases::IoResult;
use super::Platform;
use crate::{aliases::IoResult, Tock};
pub type CallbackFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
pub type Callback = Box<dyn FnOnce(io_uring::cqueue::Entry) -> CallbackFuture + 'static>;
fn box_callback_as_u64(callback: impl async FnOnce(io_uring::cqueue::Entry) + 'static) -> u64 {
let bx: Callback = Box::new(move |entry| Box::pin(callback(entry)));
fn handle_error(i: i32) -> IoResult<i32> {
if i < 0 {
return Err(std::io::Error::from_raw_os_error(-i));
}
Ok(i)
}
fn box_callback_as_u64<T: 'static>(
tx: async_channel::Sender<IoResult<T>>,
callback: impl async FnOnce(io_uring::cqueue::Entry) -> IoResult<T> + 'static,
) -> u64 {
let bx: Callback = Box::new(move |entry| {
Box::pin(async move {
tx.send(callback(entry).await).await.unwrap();
})
});
let bx: Box<Callback> = Box::new(bx);
let leaked = Box::leak(bx);
let ptr = leaked as *mut _;
@ -26,39 +41,21 @@ unsafe fn u64_as_callback(v: u64) -> Box<Callback> {
pub struct PlatformLinux {
uring: io_uring::IoUring,
tx_should_tick: async_channel::Sender<()>,
rx_should_tick: async_channel::Receiver<()>,
_pd: core::marker::PhantomData<std::rc::Rc<()>>,
_pd: core::marker::PhantomData<std::cell::Cell<()>>,
}
pub type FileHandle = io_uring::types::Fd;
impl PlatformLinux {
fn set_should_tick(&self) {
self.tx_should_tick.force_send(()).unwrap();
}
async fn wait_tick(&self) {
self.rx_should_tick.recv().await.unwrap()
}
}
impl Platform for PlatformLinux {
type NewOptions = ();
type FileHandle = io_uring::types::Fd;
fn new(opts: Self::NewOptions) -> IoResult<Self> {
let (tx_should_tick, rx_should_tick) = async_channel::bounded(1);
pub fn new() -> IoResult<Self> {
Ok(Self {
uring: IoUring::new(256)?,
tx_should_tick,
rx_should_tick,
_pd: Default::default(),
})
}
async fn tick(&self) -> IoResult<()> {
self.wait_tick().await;
pub async fn tick(&self) -> IoResult<()> {
self.uring.submit()?;
let cq = unsafe { self.uring.completion_shared() };
@ -67,58 +64,104 @@ impl Platform for PlatformLinux {
cb(entry).await;
}
Tock::new().await;
Ok(())
}
}
async fn open_file(&self, path: PathBuf) -> IoResult<Self::FileHandle> {
let (tx, rx) = async_channel::bounded(1);
pub async fn open_file(p: &PlatformLinux, path: PathBuf) -> IoResult<FileHandle> {
let (tx, rx) = async_channel::bounded(1);
let path = CString::new(path.into_os_string().into_encoded_bytes())?;
let entry =
io_uring::opcode::OpenAt::new(io_uring::types::Fd(libc::AT_FDCWD), path.as_ptr())
.build()
.user_data(box_callback_as_u64(async move |entry| {
let fd = entry.result();
let fd = io_uring::types::Fd(fd);
tx.send(fd).await.unwrap();
std::mem::drop(path);
}));
unsafe {
self.uring.submission_shared().push(&entry).unwrap();
}
self.set_should_tick();
Ok(rx.recv().await.unwrap())
}
async fn read(
&self,
f: &mut Self::FileHandle,
offset: usize,
mut buf: Box<[u8]>,
) -> IoResult<Box<[u8]>> {
let (tx, rx) = async_channel::bounded(1);
let entry = io_uring::opcode::Read::new(*f, buf.as_mut_ptr(), buf.len() as _)
.offset(offset as _)
let path = CString::new(path.into_os_string().into_encoded_bytes())?;
let entry =
io_uring::opcode::OpenAt::new(io_uring::types::Fd(libc::AT_FDCWD), path.as_ptr())
.build()
.user_data(box_callback_as_u64(async move |entry| {
let read_amt = entry.result();
dbg!(read_amt);
.user_data(box_callback_as_u64(tx, async move |entry| {
let fd = handle_error(entry.result())?;
let fd = io_uring::types::Fd(fd);
let mut buf = buf.into_vec();
buf.truncate(read_amt as _);
std::mem::drop(path);
tx.send(buf.into_boxed_slice()).await.unwrap()
Ok(fd)
}));
unsafe {
self.uring.submission_shared().push(&entry).unwrap();
}
self.set_should_tick();
Ok(rx.recv().await.unwrap())
unsafe {
p.uring.submission_shared().push(&entry).unwrap();
}
rx.recv().await.unwrap()
}
pub async fn open_tcp_socket(
p: &PlatformLinux,
socket_addr: SocketAddr,
) -> IoResult<FileHandle> {
let (tx, rx) = async_channel::bounded(1);
let domain = match () {
_ if socket_addr.is_ipv4() => libc::AF_INET,
_ if socket_addr.is_ipv6() => libc::AF_INET6,
_ => return Err(std::io::Error::other("Unsupported domain")),
};
let entry = io_uring::opcode::Socket::new(domain, libc::SOCK_STREAM, 0)
.build()
.user_data(box_callback_as_u64(tx, async move |entry| {
let res = handle_error(entry.result())?;
let sock = libc::sockaddr_in {
sin_family: domain as _,
sin_port: socket_addr.port(),
sin_addr: libc::in_addr {
s_addr: match socket_addr.ip() {
IpAddr::V4(v) => v.to_bits(),
IpAddr::V6(v) => panic!(),
},
},
sin_zero: Default::default(),
};
// FIXME(Blocking)
let fd = handle_error(unsafe {
libc::bind(
res,
&sock as *const _ as *const _,
core::mem::size_of_val(&sock) as u32,
)
})?;
Ok(io_uring::types::Fd(fd))
}));
rx.recv().await.unwrap()
}
pub async fn read(
p: &PlatformLinux,
f: &mut FileHandle,
offset: usize,
mut buf: Box<[u8]>,
) -> IoResult<Box<[u8]>> {
let (tx, rx) = async_channel::bounded(1);
let entry = io_uring::opcode::Read::new(*f, buf.as_mut_ptr(), buf.len() as _)
.offset(offset as _)
.build()
.user_data(box_callback_as_u64(tx, async move |entry| {
let read_amt = handle_error(entry.result())?;
let mut buf = buf.into_vec();
buf.truncate(read_amt as _);
Ok(buf.into_boxed_slice())
}));
unsafe {
p.uring.submission_shared().push(&entry).unwrap();
}
rx.recv().await.unwrap()
}
pub type Platform = PlatformLinux;

View file

@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{net::SocketAddr, path::PathBuf};
use crate::aliases::IoResult;
@ -6,7 +6,6 @@ pub(crate) mod linux;
pub trait Platform {
type NewOptions: Default;
type FileHandle;
fn new(opts: Self::NewOptions) -> IoResult<Self>
where
@ -14,11 +13,5 @@ pub trait Platform {
async fn tick(&self) -> IoResult<()>;
async fn open_file(&self, path: PathBuf) -> IoResult<Self::FileHandle>;
async fn read(
&self,
f: &mut Self::FileHandle,
offset: usize,
buf: Box<[u8]>,
) -> IoResult<Box<[u8]>>;
type FileHandle;
}

1
test_data/goodbye.txt Normal file
View file

@ -0,0 +1 @@
goodbye!

1
test_data/hello.txt Normal file
View file

@ -0,0 +1 @@
hello!