From bdcf820ca700a81eb24b0f10f8849a462e604d29 Mon Sep 17 00:00:00 2001 From: soup Date: Sun, 20 Oct 2024 23:46:12 -0400 Subject: [PATCH] Big library rework, but good shit is happening --- Cargo.lock | 8 +- Cargo.toml | 28 +++- examples/tcp_echo.rs | 64 +++++++- src/io.rs | 19 +++ src/io_impl/io_impl.rs | 1 + src/io_impl/io_uring/mod.rs | 293 ++++++++++++++++++++++++++++++++++++ src/io_impl/mod.rs | 9 ++ src/lib.rs | 86 +---------- src/net.rs | 67 ++++++++- src/plat/linux.rs | 248 ++++++++++++++++++++---------- src/time.rs | 7 + 11 files changed, 647 insertions(+), 183 deletions(-) create mode 100644 src/io.rs create mode 100644 src/io_impl/io_impl.rs create mode 100644 src/io_impl/io_uring/mod.rs create mode 100644 src/io_impl/mod.rs create mode 100644 src/time.rs diff --git a/Cargo.lock b/Cargo.lock index 227d9dd..0105c79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -122,12 +122,6 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" -[[package]] -name = "pollster" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22686f4785f02a4fcc856d3b3bb19bf6c8160d103f7a99cc258bddd0251dc7f2" - [[package]] name = "wove" version = "0.0.0" @@ -136,5 +130,5 @@ dependencies = [ "futures-lite", "io-uring", "libc", - "pollster", + "parking", ] diff --git a/Cargo.toml b/Cargo.toml index a9799dd..56f745b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,15 +4,27 @@ version = "0.0.0" edition = "2021" description = "wove" license = "AGPL-3.0-or-later" +resolver = "2" -[dependencies] - -[dev-dependencies] -pollster = { version = "0.3.0" } +[workspace.dependencies] futures-lite = { version = "2.3.0" } -[target.'cfg(target_os = "linux")'.dependencies] -io-uring = { version = '0.7.1' } -libc = { version = "0.2.161" } -async-channel = { version = "2.3.1" } +[dependencies] +io-uring = { version = '0.7.1', optional = true } +libc = { version = "0.2.161", optional = true } +futures-lite = { workspace = true, optional = true } +parking = { version = "2.2.1", optional = true } +async-channel = { version = "2.3.1", optional = true } +[dev-dependencies] +futures-lite = { workspace = true } + +[features] +default = ["io_uring"] +io_uring = [ + "dep:io-uring", + "dep:libc", + "dep:futures-lite", + "dep:parking", + "dep:async-channel", +] diff --git a/examples/tcp_echo.rs b/examples/tcp_echo.rs index 1966500..fa6a11c 100644 --- a/examples/tcp_echo.rs +++ b/examples/tcp_echo.rs @@ -1,10 +1,62 @@ -use futures_lite::FutureExt; -use wove::Wove; +/* + +use std::cell::RefCell; + +use futures_lite::{FutureExt, StreamExt as FlStreamExt}; + +use futures_util::{ + select, stream::FuturesUnordered, FutureExt as FuFutureExt, StreamExt as FuStreamExt, +}; +use wove::{io::AsyncRead, io::AsyncWrite, Wove}; pub async fn go(wove: &Wove) -> std::io::Result<()> { - let listener = wove::net::TcpListener::bind(wove, "localhost:3000").await?; + let mut listener = wove::net::TcpListener::bind(wove, "127.0.0.1:4838").await?; + let mut incoming = listener.incoming(wove); - todo!() + let connections = RefCell::new(FuturesUnordered::new()); + + let mut accept = Box::pin( + async { + while let Some(connection) = FlStreamExt::next(&mut incoming).await { + let mut connection = connection?; + connections.borrow_mut().push(async move { + loop { + let data = connection + .read(wove, vec![0; 4096].into_boxed_slice()) + .await?; + if data.len() == 0 { + break; + } + + connection.write(wove, data).await?; + } + + std::io::Result::Ok(()) + }); + } + + std::io::Result::Ok(()) + } + .fuse(), + ); + + let mut handle = Box::pin( + async { + while let Some(out) = FuStreamExt::next(&mut *connections.borrow_mut()).await { + out?; + } + + std::io::Result::Ok(()) + } + .fuse(), + ); + + loop { + select! { + res = handle => res?, + res = accept => res?, + } + } } pub fn main() { @@ -18,3 +70,7 @@ pub fn main() { pollster::block_on(fut).unwrap(); } +*/ + +pub fn main() { +} diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..a72c2d3 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,19 @@ +use std::future::Future; + +use crate::{aliases::IoResult, io_impl::IoImpl}; + +pub trait AsyncRead { + fn read( + &mut self, + io: &I, + buf: Box<[u8]>, + ) -> impl Future>>; +} + +pub trait AsyncWrite { + fn write( + &mut self, + io: &I, + buf: Box<[u8]>, + ) -> impl Future>; +} diff --git a/src/io_impl/io_impl.rs b/src/io_impl/io_impl.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/io_impl/io_impl.rs @@ -0,0 +1 @@ + diff --git a/src/io_impl/io_uring/mod.rs b/src/io_impl/io_uring/mod.rs new file mode 100644 index 0000000..d799b17 --- /dev/null +++ b/src/io_impl/io_uring/mod.rs @@ -0,0 +1,293 @@ +use std::{ + future::Future, + sync::atomic::{AtomicU64, Ordering}, + task::{Context, Poll, Waker}, +}; + +use parking::Parker; + +use crate::aliases::IoResult; + +use super::IoImpl; + +type CqueueEntryReceiver = async_channel::Receiver; +type CqueueEntrySender = async_channel::Sender; +pub struct UserData { + tx: CqueueEntrySender, + persist: bool, +} +impl UserData { + fn new_boxed(tx: CqueueEntrySender, persist: bool) -> Box { + Box::new(Self { tx, persist }) + } + + fn into_u64(self: Box) -> u64 { + Box::leak(self) as *mut _ as _ + } + + unsafe fn from_u64(v: u64) -> Box { + let v = v as *mut UserData; + + unsafe { Box::from_raw(v) } + } +} + +fn handle_error(i: i32) -> IoResult { + if i < 0 { + return Err(std::io::Error::from_raw_os_error(-i)); + } + + Ok(i) +} + +pub struct Tick { + did_handle: bool, + submitted: usize, + active_completions: usize, +} + +pub struct IoUring { + uring: io_uring::IoUring, + + // TODO: analyze the atomic orderings passed to the atomic operations here + // to make sure they make sense + + // TODO: I'm not sure that atomics even make sense here, but they let this + // method work with &self instead of &mut self so :shrug: + active_completions: AtomicU64, + + _pd: core::marker::PhantomData>, +} +impl IoUring { + pub fn new() -> IoResult { + let uring = io_uring::IoUring::new(256)?; + + Ok(Self { + uring, + active_completions: AtomicU64::new(0), + + _pd: Default::default(), + }) + } + + pub fn submit(&self, wait_for: usize) -> IoResult { + let submitted_count = self.uring.submit_and_wait(wait_for)?; + self.active_completions + .fetch_add(submitted_count as u64, Ordering::Relaxed); + + Ok(submitted_count) + } + + /// Returns the current number of active requests, and a boolean indicating + /// if we actually handled any events. + pub fn poll(&self) -> IoResult<(usize, bool)> { + let mut did_handle = false; + // SAFETY: + // - this method is synchronous, and `cq` is dropped at the end of the scope + // - [`IoUring`] is !Sync, so it should be impossible for 2 threads to be + // running this at the same time + let cq = unsafe { self.uring.completion_shared() }; + for entry in cq { + did_handle = true; + + let ud = unsafe { UserData::from_u64(entry.user_data()) }; + ud.tx.send_blocking(entry).unwrap(); + if ud.persist { + Box::leak(ud); + } else { + self.active_completions.fetch_sub(1, Ordering::Relaxed); + } + } + + Ok(( + self.active_completions.load(Ordering::Relaxed) as usize, + did_handle, + )) + } + + pub fn tick(&self) -> IoResult { + let submitted = self.submit(0)?; + let (active_completions, did_handle) = self.poll()?; + + Ok(Tick { + active_completions, + did_handle, + submitted, + }) + } + + /// Runs a future. Conceptually, this is similar to [`Self::block_on`], but + /// instead of acting as its own executor, this allows us to be embedded in + /// another runtime + pub async fn run(&self, fut: F, behavior: ActiveRequestBehavior) -> Run { + Run(&self, fut, behavior) + } + + pub fn block_on(&self, fut: F) -> F::Output { + futures_lite::pin!(fut); + + let parker = Parker::new(); + let unparker = parker.unparker(); + let waker = Waker::from(unparker); + + let cx = &mut Context::from_waker(&waker); + + loop { + // Check if the future is ready. If so, return the value. + if let Poll::Ready(v) = fut.as_mut().poll(cx) { + return v; + } + + let Tick { + did_handle, + active_completions, + submitted, + } = self.tick().unwrap(); + + if did_handle { + // If we handled an event, it's likely that our future can make progress, + // so continue the loop + continue; + } + + if submitted > 0 { + // We submitted an event. It's possible that our future can make progress + // once we poll, so continue the loop + continue; + } + + if active_completions > 0 { + // We didn't submit an event, but we do have completions in-flight. + // We should block until one of them completes, and then re-poll + self.submit(1).unwrap(); + continue; + } + + // If we've gotten to this point, it's likely that we're waiting on a + // future that depends on another thread to make progress. In that case, + // park the current thread until the waker is called. + parker.park(); + } + } +} + +impl IoImpl for IoUring { + async fn sleep(&self, duration: std::time::Duration) -> IoResult<()> { + let (tx, rx) = async_channel::bounded(1); + + let ts = io_uring::types::Timespec::new() + .sec(duration.as_secs()) + .nsec(duration.subsec_nanos()); + let entry = io_uring::opcode::Timeout::new(&ts as *const _) + .build() + .user_data(UserData::new_boxed(tx, false).into_u64()); + + unsafe { self.uring.submission_shared().push(&entry).unwrap() } + + let entry = rx.recv().await.unwrap(); + handle_error(entry.result())?; + + Ok(()) + } +} + +/// Behavior used in [`Run`] when the future is not ready, the last [`IoUring::tick`] +/// didn't submit or handle any events, and there are requests in flight +#[derive(Copy, Clone, Debug)] +pub enum ActiveRequestBehavior { + /// Cause a panic + Panic, + + /// Block until a completion is returned + Block, + + /// Return pending. + /// + ///
+ /// + /// NOTE: this relies on the fact that the executor is going to poll this + /// _eventually_. If it doesn't, it's likely that you'll get a deadlock. + /// + ///
+ Pending, +} + +pub struct Run<'a, F: Future>(&'a IoUring, F, ActiveRequestBehavior); +impl Future for Run<'_, F> { + type Output = F::Output; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let io = self.0; + let behavior = self.2; + let fut = unsafe { self.map_unchecked_mut(|s| &mut s.1) }; + + if let Poll::Ready(out) = fut.poll(cx) { + return Poll::Ready(out); + } + + let Tick { + did_handle, + submitted, + active_completions, + } = io.tick().unwrap(); + + if did_handle { + // We handled an event, it's likely that the future can make progress + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + if submitted > 0 { + // We submitted an event, it's possible that the future can make progress, + // so we should wake the executor and get re-polled + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + if active_completions > 0 { + // We have completions in flight, but they're not ready yet. + match behavior { + ActiveRequestBehavior::Panic => { + panic!("The future was not ready, and there are completions in-flight") + }, + ActiveRequestBehavior::Block => { + io.submit(1).unwrap(); + return Poll::Pending; + }, + ActiveRequestBehavior::Pending => { + return Poll::Pending; + }, + } + } + + // The future likely depends on another thread, so return pending + Poll::Pending + } +} + +#[cfg(test)] +mod test { + mod block_on { + use std::time::Duration; + + use crate::io_impl::io_uring::IoUring; + + #[test] + fn simple() { + let uring = IoUring::new().unwrap(); + let out = uring.block_on(async { 5 }); + assert_eq!(out, 5); + } + + #[test] + fn timer() { + let uring = &IoUring::new().unwrap(); + let out = uring.block_on(async { + crate::time::sleep(uring, Duration::from_secs(1)).await; + 5 + }); + + assert_eq!(out, 5); + } + } +} diff --git a/src/io_impl/mod.rs b/src/io_impl/mod.rs new file mode 100644 index 0000000..7d62027 --- /dev/null +++ b/src/io_impl/mod.rs @@ -0,0 +1,9 @@ +use crate::aliases::IoResult; +use std::time::Duration; + +#[cfg(feature = "io_uring")] +pub mod io_uring; + +pub trait IoImpl { + async fn sleep(&self, duration: Duration) -> IoResult<()>; +} diff --git a/src/lib.rs b/src/lib.rs index 0606162..0876e8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,88 +1,14 @@ #![feature(async_closure)] +#![feature(async_iterator)] #![feature(never_type)] mod aliases; -pub mod fs; -pub mod net; -mod plat; -mod wove; +// pub mod fs; +pub mod io; +pub mod io_impl; +// pub mod net; +pub mod time; use std::{future::Future, pin::Pin, task::Poll}; use aliases::IoResult; -#[cfg(target_os = "linux")] -pub(crate) use plat::linux as plat_impl; - -pub struct Tock(bool); -impl Tock { - fn new() -> Self { - Self(false) - } -} - -impl Future for Tock { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - if self.0 { - return Poll::Ready(()); - } - - let this = unsafe { self.get_unchecked_mut() }; - this.0 = true; - cx.waker().wake_by_ref(); - - Poll::Pending - } -} - -pub struct Wove { - platform: plat_impl::Platform, -} - -impl Wove { - pub fn new() -> IoResult { - Ok(Self { - platform: plat_impl::Platform::new()?, - }) - } - - pub async fn run(&self) -> IoResult { - loop { - self.platform.tick().await?; - } - } -} - -#[cfg(test)] -mod test { - use std::path::PathBuf; - - use crate::Wove; - use futures_lite::future::try_zip; - - #[test] - fn sketch() { - let mut wove = Wove::new().unwrap(); - let wove = &mut wove; - let fut = async { - let run = async { - wove.run().await?; - }; - - let hello = crate::fs::read_to_string(wove, PathBuf::from("test_data/hello.txt")); - let goodbye = - crate::fs::read_to_string(wove, PathBuf::from("test_data/goodbye.txt")); - let both = try_zip(hello, goodbye); - - let contents = futures_lite::future::race(both, run).await; - - contents - }; - - let (hello, goodbye) = pollster::block_on(fut).unwrap(); - - assert_eq!(hello, "hello!\n"); - assert_eq!(goodbye, "goodbye!\n"); - } -} diff --git a/src/net.rs b/src/net.rs index a523656..4defd88 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,6 +1,34 @@ -use std::net::ToSocketAddrs; +use std::{async_iter::AsyncIterator, net::ToSocketAddrs, pin::Pin}; -use crate::{aliases::IoResult, plat_impl, Wove}; +use futures_lite::Stream; + +use crate::{ + aliases::IoResult, + io::{AsyncRead, AsyncWrite}, + plat_impl, Wove, +}; + +pub struct TcpStream(pub(crate) plat_impl::FileHandle); + +impl AsyncRead for TcpStream { + fn read( + &mut self, + wove: &Wove, + buf: Box<[u8]>, + ) -> impl std::future::Future>> { + plat_impl::read(&wove.platform, &mut self.0, 0, buf) + } +} + +impl AsyncWrite for TcpStream { + fn write( + &mut self, + wove: &Wove, + buf: Box<[u8]>, + ) -> impl std::future::Future> { + plat_impl::write(&wove.platform, &mut self.0, 0, buf) + } +} pub struct TcpListener(plat_impl::FileHandle); @@ -22,4 +50,39 @@ impl TcpListener { Err(std::io::Error::other("No addrs returned")) } + + pub fn incoming<'a>(&'a mut self, wove: &'a Wove) -> Incoming<'a> { + Incoming::register(wove, self) + } +} + +pub struct Incoming<'a>(plat_impl::Incoming<'a>); +impl<'a> Incoming<'a> { + fn register(wove: &'a Wove, listener: &'a mut TcpListener) -> Self { + Incoming(plat_impl::accept_many(&wove.platform, &mut listener.0)) + } +} + +impl AsyncIterator for Incoming<'_> { + type Item = IoResult; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) }; + + AsyncIterator::poll_next(inner, cx) + } +} + +impl Stream for Incoming<'_> { + type Item = IoResult; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + AsyncIterator::poll_next(self, cx) + } } diff --git a/src/plat/linux.rs b/src/plat/linux.rs index a32280b..72dc6b1 100644 --- a/src/plat/linux.rs +++ b/src/plat/linux.rs @@ -1,43 +1,17 @@ use std::{ - ffi::CString, future::Future, net::IpAddr, net::SocketAddr, path::PathBuf, pin::Pin, + async_iter::AsyncIterator, + ffi::CString, + future::Future, + net::{IpAddr, SocketAddr}, + os::fd::IntoRawFd, + path::PathBuf, + pin::Pin, }; +use futures_lite::Stream; use io_uring::IoUring; -use crate::{aliases::IoResult, Tock}; - -pub type CallbackFuture = Pin + 'static>>; -pub type Callback = Box CallbackFuture + 'static>; - -fn handle_error(i: i32) -> IoResult { - if i < 0 { - return Err(std::io::Error::from_raw_os_error(-i)); - } - - Ok(i) -} - -fn box_callback_as_u64( - tx: async_channel::Sender>, - callback: impl async FnOnce(io_uring::cqueue::Entry) -> IoResult + 'static, -) -> u64 { - let bx: Callback = Box::new(move |entry| { - Box::pin(async move { - tx.send(callback(entry).await).await.unwrap(); - }) - }); - let bx: Box = Box::new(bx); - let leaked = Box::leak(bx); - let ptr = leaked as *mut _; - - ptr as u64 -} - -unsafe fn u64_as_callback(v: u64) -> Box { - let v = v as *mut Callback; - - unsafe { Box::from_raw(v) } -} +use crate::{aliases::IoResult, net::TcpStream}; pub struct PlatformLinux { uring: io_uring::IoUring, @@ -60,12 +34,13 @@ impl PlatformLinux { let cq = unsafe { self.uring.completion_shared() }; for entry in cq { - let cb = unsafe { u64_as_callback(entry.user_data()) }; - cb(entry).await; + let ud = unsafe { UserData::from_u64(entry.user_data()) }; + ud.tx.send(entry).await.unwrap(); + if ud.persist { + Box::leak(ud); + } } - Tock::new().await; - Ok(()) } } @@ -77,27 +52,33 @@ pub async fn open_file(p: &PlatformLinux, path: PathBuf) -> IoResult let entry = io_uring::opcode::OpenAt::new(io_uring::types::Fd(libc::AT_FDCWD), path.as_ptr()) .build() - .user_data(box_callback_as_u64(tx, async move |entry| { - let fd = handle_error(entry.result())?; - let fd = io_uring::types::Fd(fd); - - std::mem::drop(path); - - Ok(fd) - })); + .user_data(UserData::new_boxed(tx, false).into_u64()); unsafe { p.uring.submission_shared().push(&entry).unwrap(); } - rx.recv().await.unwrap() + let entry = rx.recv().await.unwrap(); + let fd = handle_error(entry.result())?; + let fd = io_uring::types::Fd(fd); + + Ok(fd) } pub async fn open_tcp_socket( p: &PlatformLinux, socket_addr: SocketAddr, ) -> IoResult { - let (tx, rx) = async_channel::bounded(1); + // FIXME(Blocking) + // There is some some magic missing in the commented out code to make the + // socket clean up properly on process exit and whatnot. For now, just use + // the std implementation and cast it to a FileHandle + + let listener = std::net::TcpListener::bind(socket_addr)?; + Ok(io_uring::types::Fd(listener.into_raw_fd())) + + /* + // let (tx, rx) = async_channel::bounded(1); let domain = match () { _ if socket_addr.is_ipv4() => libc::AF_INET, @@ -107,34 +88,42 @@ pub async fn open_tcp_socket( let entry = io_uring::opcode::Socket::new(domain, libc::SOCK_STREAM, 0) .build() - .user_data(box_callback_as_u64(tx, async move |entry| { - let res = handle_error(entry.result())?; + .user_data(UserData::new_boxed(tx, false).into_u64()); - let sock = libc::sockaddr_in { - sin_family: domain as _, - sin_port: socket_addr.port(), - sin_addr: libc::in_addr { - s_addr: match socket_addr.ip() { - IpAddr::V4(v) => v.to_bits(), - IpAddr::V6(v) => panic!(), - }, - }, - sin_zero: Default::default(), - }; + unsafe { + p.uring.submission_shared().push(&entry).unwrap(); + } - // FIXME(Blocking) - let fd = handle_error(unsafe { - libc::bind( - res, - &sock as *const _ as *const _, - core::mem::size_of_val(&sock) as u32, - ) - })?; + let entry = rx.recv().await.unwrap(); - Ok(io_uring::types::Fd(fd)) - })); + let fd = handle_error(entry.result())?; - rx.recv().await.unwrap() + let sock = libc::sockaddr_in { + sin_family: domain as _, + sin_port: socket_addr.port().to_be(), + sin_addr: libc::in_addr { + s_addr: match socket_addr.ip() { + IpAddr::V4(v) => v.to_bits().to_be(), + IpAddr::V6(_) => panic!(), + }, + }, + sin_zero: Default::default(), + }; + + // FIXME(Blocking) + handle_error(unsafe { + libc::bind( + fd, + &sock as *const _ as *const _, + core::mem::size_of_val(&sock) as u32, + ) + })?; + + // FIXME(Blocking) + handle_error(unsafe { libc::listen(fd, libc::SOMAXCONN) })?; + + Ok(io_uring::types::Fd(fd)) + */ } pub async fn read( @@ -148,20 +137,115 @@ pub async fn read( let entry = io_uring::opcode::Read::new(*f, buf.as_mut_ptr(), buf.len() as _) .offset(offset as _) .build() - .user_data(box_callback_as_u64(tx, async move |entry| { - let read_amt = handle_error(entry.result())?; - - let mut buf = buf.into_vec(); - buf.truncate(read_amt as _); - - Ok(buf.into_boxed_slice()) - })); + .user_data(UserData::new_boxed(tx, false).into_u64()); unsafe { p.uring.submission_shared().push(&entry).unwrap(); } - rx.recv().await.unwrap() + let entry = rx.recv().await.unwrap(); + let read_amt = handle_error(entry.result())?; + + let mut buf = buf.into_vec(); + buf.truncate(read_amt as _); + + Ok(buf.into_boxed_slice()) +} + +pub async fn write( + p: &PlatformLinux, + f: &mut FileHandle, + offset: usize, + mut buf: Box<[u8]>, +) -> IoResult { + let (tx, rx) = async_channel::bounded(1); + + let entry = io_uring::opcode::Write::new(*f, buf.as_mut_ptr(), buf.len() as _) + .offset(offset as _) + .build() + .user_data(UserData::new_boxed(tx, false).into_u64()); + + unsafe { + p.uring.submission_shared().push(&entry).unwrap(); + } + + let entry = rx.recv().await.unwrap(); + let write_amt = handle_error(entry.result())?; + + Ok(write_amt as usize) +} + +pub(crate) struct Incoming<'a> { + plat: &'a PlatformLinux, + rx: Pin>, + cb_id: u64, +} + +pub fn accept_many<'a>(p: &'a PlatformLinux, f: &mut FileHandle) -> Incoming<'a> { + let (tx, rx) = async_channel::unbounded(); + + let cb_id = UserData::new_boxed(tx, true).into_u64(); + + let entry = io_uring::opcode::AcceptMulti::new(*f) + .build() + .user_data(cb_id); + + unsafe { + p.uring.submission_shared().push(&entry).unwrap(); + } + + Incoming { + plat: p, + rx: Box::pin(rx), + cb_id, + } +} + +pub fn cancel(p: &PlatformLinux, id: u64) { + let entry = io_uring::opcode::AsyncCancel::new(id).build(); + + unsafe { + p.uring.submission_shared().push(&entry).unwrap(); + } +} + +impl AsyncIterator for Incoming<'_> { + type Item = IoResult; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let rx = unsafe { self.map_unchecked_mut(|s| &mut s.rx) }; + let mut fut = rx.recv(); + let pinned = unsafe { Pin::new_unchecked(&mut fut) }; + + pinned + .poll(cx) + .map(|entry| { + let fd = handle_error(entry.unwrap().result())?; + + Ok(crate::net::TcpStream(io_uring::types::Fd(fd))) + }) + .map(Some) + } +} + +impl Stream for Incoming<'_> { + type Item = IoResult; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + AsyncIterator::poll_next(self, cx) + } +} + +impl Drop for Incoming<'_> { + fn drop(&mut self) { + cancel(self.plat, self.cb_id); + } } pub type Platform = PlatformLinux; diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000..16a9541 --- /dev/null +++ b/src/time.rs @@ -0,0 +1,7 @@ +use std::time::Duration; + +use crate::io_impl::IoImpl; + +pub async fn sleep(io: &I, duration: Duration) { + let _ = io.sleep(duration).await; +}