Compare commits

...

2 commits

Author SHA1 Message Date
soup 8c5b34f107
More reworking 2024-10-21 23:38:48 -04:00
soup 287d3d4933
. 2024-10-21 19:35:35 -04:00
8 changed files with 298 additions and 187 deletions

11
Cargo.lock generated
View file

@ -3,14 +3,13 @@
version = 4
[[package]]
name = "async-channel"
version = "2.3.1"
name = "async-lock"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
dependencies = [
"concurrent-queue",
"event-listener",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
@ -126,7 +125,7 @@ checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
name = "wove"
version = "0.0.0"
dependencies = [
"async-channel",
"async-lock",
"futures-lite",
"io-uring",
"libc",

View file

@ -14,7 +14,7 @@ io-uring = { version = '0.7.1', optional = true }
libc = { version = "0.2.161", optional = true }
futures-lite = { workspace = true, optional = true }
parking = { version = "2.2.1", optional = true }
async-channel = { version = "2.3.1", optional = true }
async-lock = { version = "3.4.0", optional = true }
[dev-dependencies]
futures-lite = { workspace = true }
@ -26,5 +26,5 @@ io_uring = [
"dep:libc",
"dep:futures-lite",
"dep:parking",
"dep:async-channel",
"dep:async-lock",
]

View file

@ -1,11 +1,70 @@
use crate::aliases::IoResult;
use std::future::Future;
use crate::{aliases::IoResult, io_impl::IoImpl};
pub trait AsyncRead {
fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = IoResult<usize>>;
pub trait BufferMut {
fn as_mut_ptr(&mut self) -> *mut u8;
fn writable_bytes(&self) -> usize;
}
pub trait AsyncWrite {
fn write(&mut self, buf: &[u8]) -> impl Future<Output = IoResult<usize>>;
impl BufferMut for Vec<u8> {
fn as_mut_ptr(&mut self) -> *mut u8 {
self.as_mut_ptr()
}
fn writable_bytes(&self) -> usize {
self.len()
}
}
impl BufferMut for Box<[u8]> {
fn as_mut_ptr(&mut self) -> *mut u8 {
self[..].as_mut_ptr()
}
fn writable_bytes(&self) -> usize {
self.len()
}
}
pub trait AsyncReadLoan {
fn read<B: BufferMut>(&mut self, buf: B) -> impl Future<Output = (B, IoResult<usize>)>;
}
pub trait Buffer {
fn as_ptr(&self) -> *const u8;
fn readable_bytes(&self) -> usize;
}
impl Buffer for Vec<u8> {
fn as_ptr(&self) -> *const u8 {
self.as_ptr()
}
fn readable_bytes(&self) -> usize {
self.len()
}
}
impl Buffer for Box<[u8]> {
fn as_ptr(&self) -> *const u8 {
self[..].as_ptr()
}
fn readable_bytes(&self) -> usize {
self.len()
}
}
impl Buffer for &'static [u8] {
fn as_ptr(&self) -> *const u8 {
self[..].as_ptr()
}
fn readable_bytes(&self) -> usize {
self.len()
}
}
pub trait AsyncWriteLoan {
fn write<B: Buffer>(&mut self, buf: B) -> impl Future<Output = (B, IoResult<usize>)>;
}

View file

@ -1,36 +1,78 @@
use std::{
async_iter::AsyncIterator,
future::Future,
net::SocketAddr,
ops::Deref,
os::fd::{FromRawFd, IntoRawFd},
pin::Pin,
sync::atomic::{AtomicU64, Ordering},
rc::Rc,
sync::atomic::{AtomicI32, AtomicU64, Ordering},
task::{Context, Poll, Waker},
};
use futures_lite::Stream;
use async_lock::{futures::BarrierWait, Barrier};
use futures_lite::{pin, Stream};
use parking::Parker;
use crate::aliases::IoResult;
use crate::{
aliases::IoResult,
io::{AsyncReadLoan, AsyncWriteLoan},
};
use super::IoImpl;
type CqueueEntryReceiver = async_channel::Receiver<io_uring::cqueue::Entry>;
type CqueueEntrySender = async_channel::Sender<io_uring::cqueue::Entry>;
pub struct UserData {
tx: CqueueEntrySender,
persist: bool,
#[derive(Debug)]
struct ResultBarrier {
result: AtomicI32,
barrier: Barrier,
}
impl UserData {
fn new_boxed(tx: CqueueEntrySender, persist: bool) -> Box<Self> {
Box::new(Self { tx, persist })
impl ResultBarrier {
fn new() -> Self {
Self {
result: AtomicI32::new(0),
barrier: Barrier::new(2),
}
}
async fn wait(&self) {
self.barrier.wait().await;
}
fn result(&self) -> i32 {
self.result.load(Ordering::Relaxed)
}
fn set_result_and_block(&self, v: i32) {
self.result.store(v, Ordering::Relaxed);
self.barrier.wait_blocking();
}
async fn wait_result(&self) -> i32 {
self.wait().await;
self.result()
}
}
#[derive(Debug)]
pub struct UserData<'a> {
persist: bool,
rb: &'a ResultBarrier,
}
impl<'a> UserData<'a> {
fn new_boxed(rb: &'a ResultBarrier, persist: bool) -> Box<Self> {
Box::new(Self { rb, persist })
}
fn new_into_u64(rb: &'a ResultBarrier, persist: bool) -> u64 {
Self::new_boxed(rb, persist).into_u64()
}
fn into_u64(self: Box<Self>) -> u64 {
Box::leak(self) as *mut _ as _
}
unsafe fn from_u64(v: u64) -> Box<UserData> {
unsafe fn from_u64(v: u64) -> Box<UserData<'a>> {
let v = v as *mut UserData;
unsafe { Box::from_raw(v) }
@ -51,7 +93,22 @@ pub struct Tick {
active_completions: usize,
}
pub struct IoUring {
#[derive(Clone)]
pub struct IoUring(Rc<IoUringInner>);
impl IoUring {
pub fn new() -> IoResult<Self> {
Ok(Self(Rc::new(IoUringInner::new()?)))
}
}
impl Deref for IoUring {
type Target = IoUringInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct IoUringInner {
uring: io_uring::IoUring,
// TODO: analyze the atomic orderings passed to the atomic operations here
@ -63,7 +120,7 @@ pub struct IoUring {
_pd: core::marker::PhantomData<std::cell::RefCell<()>>,
}
impl IoUring {
impl IoUringInner {
pub fn new() -> IoResult<Self> {
let uring = io_uring::IoUring::new(256)?;
@ -75,6 +132,33 @@ impl IoUring {
})
}
/// Cancel all events for the given fd. Does not return anything, and
/// cancellations are made on a best-effort basis
fn cancel_fd(&self, fd: io_uring::types::Fd) {
let rb = ResultBarrier::new();
let entry =
io_uring::opcode::AsyncCancel2::new(io_uring::types::CancelBuilder::fd(fd).all())
.build()
.user_data(UserData::new_into_u64(&rb, false));
self.queue_op(entry);
}
fn queue_op(&self, op: io_uring::squeue::Entry) {
unsafe { self.uring.submission_shared().push(&op).unwrap() }
}
async fn wait_op(&self, op: io_uring::squeue::Entry) -> IoResult<i32> {
let rb = ResultBarrier::new();
let entry = op.user_data(UserData::new_into_u64(&rb, false));
self.queue_op(entry);
handle_error(rb.wait_result().await)
}
pub fn submit(&self, wait_for: usize) -> IoResult<usize> {
let submitted_count = self.uring.submit_and_wait(wait_for)?;
self.active_completions
@ -102,7 +186,7 @@ impl IoUring {
}
let ud = unsafe { UserData::from_u64(entry.user_data()) };
ud.tx.send_blocking(entry).unwrap();
ud.rb.set_result_and_block(entry.result());
if ud.persist {
Box::leak(ud);
} else {
@ -189,33 +273,69 @@ impl Drop for TcpListener {
}
}
pub struct TcpStream(io_uring::types::Fd);
pub struct TcpStream {
uring: IoUring,
fd: io_uring::types::Fd,
}
impl Drop for TcpStream {
fn drop(&mut self) {
unsafe { std::net::TcpStream::from_raw_fd(self.0 .0) };
self.uring.cancel_fd(self.fd);
unsafe { std::net::TcpStream::from_raw_fd(self.fd.0) };
}
}
impl AsyncReadLoan for TcpStream {
async fn read<B: crate::io::BufferMut>(&mut self, mut buf: B) -> (B, IoResult<usize>) {
let res = self
.uring
.0
.wait_op(
io_uring::opcode::Read::new(
self.fd,
buf.as_mut_ptr(),
buf.writable_bytes() as u32,
)
.build(),
)
.await
.map(|v| v as usize);
(buf, res)
}
}
impl AsyncWriteLoan for TcpStream {
async fn write<B: crate::io::Buffer>(&mut self, buf: B) -> (B, IoResult<usize>) {
let res = self
.uring
.0
.wait_op(
io_uring::opcode::Write::new(
self.fd,
buf.as_ptr(),
buf.readable_bytes() as u32,
)
.build(),
)
.await
.map(|v| v as usize);
(buf, res)
}
}
impl IoImpl for IoUring {
async fn sleep(&self, duration: std::time::Duration) -> IoResult<()> {
let (tx, rx) = async_channel::bounded(1);
let ts = io_uring::types::Timespec::new()
.sec(duration.as_secs())
.nsec(duration.subsec_nanos());
let entry = io_uring::opcode::Timeout::new(&ts as *const _)
.build()
.user_data(UserData::new_boxed(tx, false).into_u64());
let entry = io_uring::opcode::Timeout::new(&ts as *const _).build();
unsafe { self.uring.submission_shared().push(&entry).unwrap() }
let entry = rx.recv().await.unwrap();
handle_error(entry.result())?;
let _ = self.0.wait_op(entry).await;
Ok(())
}
type TcpListener = io_uring::types::Fd;
type TcpListener = TcpListener;
fn open_tcp_socket(
&self,
addr: std::net::SocketAddr,
@ -226,7 +346,7 @@ impl IoImpl for IoUring {
// the std implementation and cast it to a FileHandle
let listener = std::net::TcpListener::bind(addr);
async { IoResult::Ok(io_uring::types::Fd(listener?.into_raw_fd())) }
async { IoResult::Ok(TcpListener(io_uring::types::Fd(listener?.into_raw_fd()))) }
/*
// let (tx, rx) = async_channel::bounded(1);
@ -279,7 +399,7 @@ impl IoImpl for IoUring {
async fn listener_local_addr(&self, listener: &Self::TcpListener) -> IoResult<SocketAddr> {
// FIXME(Blocking)
let listener = unsafe { std::net::TcpListener::from_raw_fd(listener.0) };
let listener = unsafe { std::net::TcpListener::from_raw_fd(listener.0 .0) };
let addr = listener.local_addr()?;
let _ = listener.into_raw_fd();
@ -291,45 +411,32 @@ impl IoImpl for IoUring {
&self,
listener: &mut Self::TcpListener,
) -> impl Future<Output = IoResult<Self::Incoming>> {
let (tx, rx) = async_channel::unbounded();
let rb = Box::pin(ResultBarrier::new());
let cb_id = UserData::new_boxed(tx, true).into_u64();
let cb_id = UserData::new_into_u64(&rb, true);
let entry = io_uring::opcode::AcceptMulti::new(*listener)
let entry = io_uring::opcode::AcceptMulti::new(listener.0)
.build()
.user_data(cb_id);
unsafe {
self.uring.submission_shared().push(&entry).unwrap();
self.0.uring.submission_shared().push(&entry).unwrap();
}
let wait = unsafe {
core::mem::transmute::<BarrierWait<'_>, BarrierWait<'_>>(rb.barrier.wait())
};
async move {
Ok(Incoming {
io: self as *const _,
rx: Box::pin(rx),
cb_id,
uring: self.clone(),
rb,
wait,
fd: listener.0,
})
}
}
type TcpStream = TcpStream;
async fn tcp_read(&self, stream: &mut Self::TcpStream, buf: &mut [u8]) -> IoResult<usize> {
let (tx, rx) = async_channel::bounded(1);
let entry = io_uring::opcode::Read::new(stream.0, buf.as_mut_ptr(), buf.len() as u32)
.build()
.user_data(UserData::new_boxed(tx, false).into_u64());
unsafe {
self.uring.submission_shared().push(&entry).unwrap();
}
let entry = rx.recv().await.unwrap();
let read_amt = handle_error(entry.result())?;
Ok(read_amt as usize)
}
fn tcp_connect(
&self,
socket: SocketAddr,
@ -337,78 +444,50 @@ impl IoImpl for IoUring {
// FIXME(Blocking)
let stream = std::net::TcpStream::connect(socket);
async { Ok(TcpStream(io_uring::types::Fd(stream?.into_raw_fd()))) }
}
async fn tcp_write(&self, stream: &mut Self::TcpStream, buf: &[u8]) -> IoResult<usize> {
let (tx, rx) = async_channel::bounded(1);
let entry = io_uring::opcode::Write::new(stream.0, buf.as_ptr(), buf.len() as u32)
.build()
.user_data(UserData::new_boxed(tx, false).into_u64());
unsafe {
self.uring.submission_shared().push(&entry).unwrap();
async {
Ok(TcpStream {
uring: self.clone(),
fd: io_uring::types::Fd(stream?.into_raw_fd()),
})
}
let entry = rx.recv().await.unwrap();
let write_amt = handle_error(entry.result())?;
Ok(write_amt as usize)
}
}
pub struct Incoming {
io: *const IoUring,
rx: Pin<Box<CqueueEntryReceiver>>,
cb_id: u64,
uring: IoUring,
rb: Pin<Box<ResultBarrier>>,
wait: BarrierWait<'static>,
fd: io_uring::types::Fd,
}
pub fn cancel(io: &IoUring, id: u64) {
let entry = io_uring::opcode::AsyncCancel::new(id).build().user_data(0);
unsafe {
io.uring.submission_shared().push(&entry).unwrap();
}
}
impl AsyncIterator for Incoming {
type Item = IoResult<TcpStream>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let rx = unsafe { self.map_unchecked_mut(|s| &mut s.rx) };
let mut fut = rx.recv();
let pinned = unsafe { Pin::new_unchecked(&mut fut) };
pinned
.poll(cx)
.map(|entry| {
let fd = handle_error(entry.unwrap().result())?;
Ok(TcpStream(io_uring::types::Fd(fd)))
})
.map(Some)
}
impl Unpin for Incoming {
}
impl Stream for Incoming {
type Item = IoResult<TcpStream>;
fn poll_next(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
AsyncIterator::poll_next(self, cx)
let fut = unsafe { self.as_mut().map_unchecked_mut(|s| &mut s.wait) };
pin!(fut);
fut.poll(cx)
.map(|_| {
let fd = handle_error(self.rb.result())?;
Ok(TcpStream {
uring: self.uring.clone(),
fd: io_uring::types::Fd(fd),
})
})
.map(Some)
}
}
impl Drop for Incoming {
fn drop(&mut self) {
let io = unsafe { &*self.io };
cancel(io, self.cb_id);
self.uring.0.cancel_fd(self.fd);
}
}
@ -433,7 +512,7 @@ pub enum ActiveRequestBehavior {
Pending,
}
pub struct Run<'a, F: Future>(&'a IoUring, F, ActiveRequestBehavior);
pub struct Run<'a, F: Future>(&'a IoUringInner, F, ActiveRequestBehavior);
impl<F: Future> Future for Run<'_, F> {
type Output = F::Output;
@ -496,7 +575,7 @@ mod test {
#[test]
fn simple() {
let uring = IoUring::new().unwrap();
let uring = &IoUring::new().unwrap();
let out = uring.block_on(async { 5 });
assert_eq!(out, 5);
}
@ -505,7 +584,9 @@ mod test {
fn sleep() {
let uring = &IoUring::new().unwrap();
let out = uring.block_on(async {
crate::time::sleep(uring, Duration::from_secs(1)).await;
crate::time::sleep(uring, Duration::from_secs(1))
.await
.unwrap();
5
});
@ -527,7 +608,8 @@ mod test {
crate::time::sleep(uring, Duration::from_secs(1)),
ActiveRequestBehavior::Block,
)
.await;
.await
.unwrap();
5
});
@ -541,7 +623,7 @@ mod test {
use crate::{
aliases::IoResult,
io::{AsyncRead, AsyncWrite},
io::{AsyncReadLoan, AsyncWriteLoan},
io_impl::io_uring::IoUring,
};
use futures_lite::StreamExt;
@ -549,14 +631,14 @@ mod test {
async fn start_echo(
uring: &IoUring,
) -> IoResult<(SocketAddr, impl Future<Output = IoResult<Box<[u8]>>> + '_)> {
let listener = crate::net::TcpListener::bind(uring, "127.0.0.1:0").await?;
let mut listener = crate::net::TcpListener::bind(uring, "127.0.0.1:0").await?;
Ok((listener.local_addr().await?, async {
Ok((listener.local_addr().await?, async move {
let mut incoming = listener.incoming().await?;
let mut stream = incoming.next().await.unwrap()?;
let mut data = vec![0; 4096];
let read_amt = stream.read(&mut data).await?;
let (mut data, read_amt) = stream.read(vec![0; 4096]).await;
let read_amt = read_amt?;
data.truncate(read_amt);
Ok(data.into_boxed_slice())
@ -573,7 +655,8 @@ mod test {
let (addr, read_data) = start_echo(uring).await?;
let mut conn = crate::net::TcpStream::connect(uring, addr).await?;
conn.write(input).await?;
let (_, res) = conn.write(input).await;
res?;
let data = read_data.await?;

View file

@ -1,4 +1,7 @@
use crate::aliases::IoResult;
use crate::{
aliases::IoResult,
io::{AsyncReadLoan, AsyncWriteLoan},
};
use std::{future::Future, net::SocketAddr, time::Duration};
#[cfg(feature = "io_uring")]
@ -23,17 +26,7 @@ pub trait IoImpl {
listener: &mut Self::TcpListener,
) -> impl Future<Output = IoResult<Self::Incoming>>;
type TcpStream;
fn tcp_read(
&self,
stream: &mut Self::TcpStream,
buf: &mut [u8],
) -> impl Future<Output = IoResult<usize>>;
fn tcp_write(
&self,
stream: &mut Self::TcpStream,
buf: &[u8],
) -> impl Future<Output = IoResult<usize>>;
type TcpStream: AsyncReadLoan + AsyncWriteLoan;
fn tcp_connect(
&self,
socket: SocketAddr,

View file

@ -1,7 +1,3 @@
#![feature(async_closure)]
#![feature(async_iterator)]
#![feature(never_type)]
mod aliases;
// pub mod fs;
pub mod io;

View file

@ -1,5 +1,4 @@
use std::{
async_iter::AsyncIterator,
net::{SocketAddr, ToSocketAddrs},
pin::Pin,
};
@ -8,26 +7,26 @@ use futures_lite::Stream;
use crate::{
aliases::IoResult,
io::{AsyncRead, AsyncWrite},
io::{AsyncReadLoan, AsyncWriteLoan, Buffer, BufferMut},
io_impl::IoImpl,
};
pub struct TcpStream<'a, I: IoImpl>(&'a I, I::TcpStream);
impl<'a, I: IoImpl> TcpStream<'a, I> {
pub async fn connect(io: &'a I, addr: SocketAddr) -> IoResult<Self> {
Ok(Self(io, io.tcp_connect(addr).await?))
pub struct TcpStream<I: IoImpl>(I::TcpStream);
impl<I: IoImpl> TcpStream<I> {
pub async fn connect(io: &I, addr: SocketAddr) -> IoResult<Self> {
Ok(Self(io.tcp_connect(addr).await?))
}
}
impl<'a, I: IoImpl> AsyncRead for TcpStream<'a, I> {
async fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
self.0.tcp_read(&mut self.1, buf).await
impl<I: IoImpl> AsyncReadLoan for TcpStream<I> {
async fn read<B: BufferMut>(&mut self, buf: B) -> (B, IoResult<usize>) {
self.0.read(buf).await
}
}
impl<'a, I: IoImpl> AsyncWrite for TcpStream<'a, I> {
async fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
self.0.tcp_write(&mut self.1, buf).await
impl<I: IoImpl> AsyncWriteLoan for TcpStream<I> {
async fn write<B: Buffer>(&mut self, buf: B) -> (B, IoResult<usize>) {
self.0.write(buf).await
}
}
@ -56,43 +55,25 @@ impl<'a, I: IoImpl> TcpListener<'a, I> {
self.0.listener_local_addr(&self.1).await
}
pub async fn incoming(mut self) -> IoResult<Incoming<'a, I>> {
Ok(Incoming(self.0, self.0.accept_many(&mut self.1).await?))
pub async fn incoming(&mut self) -> IoResult<Incoming<I>> {
Ok(Incoming(self.0.accept_many(&mut self.1).await?))
}
}
pub struct Incoming<'a, I: IoImpl>(&'a I, I::Incoming);
pub struct Incoming<I: IoImpl>(I::Incoming);
impl<'a, I: IoImpl> AsyncIterator for Incoming<'a, I>
where
I::Incoming: AsyncIterator<Item = IoResult<I::TcpStream>>,
{
type Item = IoResult<TcpStream<'a, I>>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let io = self.0;
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.1) };
AsyncIterator::poll_next(inner, cx).map(|o| o.map(|r| r.map(|i| TcpStream(io, i))))
}
}
impl<'a, I: IoImpl> Stream for Incoming<'a, I>
impl<I: IoImpl> Stream for Incoming<I>
where
I::Incoming: Stream<Item = IoResult<I::TcpStream>>,
{
type Item = IoResult<TcpStream<'a, I>>;
type Item = IoResult<TcpStream<I>>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let io = self.0;
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.1) };
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
Stream::poll_next(inner, cx).map(|o| o.map(|r| r.map(|i| TcpStream(io, i))))
Stream::poll_next(inner, cx).map(|o| o.map(|r| r.map(|i| TcpStream(i))))
}
}

View file

@ -1,7 +1,7 @@
use std::time::Duration;
use crate::io_impl::IoImpl;
use crate::{aliases::IoResult, io_impl::IoImpl};
pub async fn sleep<I: IoImpl>(io: &I, duration: Duration) {
let _ = io.sleep(duration).await;
pub async fn sleep<I: IoImpl>(io: &I, duration: Duration) -> IoResult<()> {
io.sleep(duration).await
}