Compare commits

...

2 commits

Author SHA1 Message Date
soup 8c5b34f107
More reworking 2024-10-21 23:38:48 -04:00
soup 287d3d4933
. 2024-10-21 19:35:35 -04:00
8 changed files with 298 additions and 187 deletions

11
Cargo.lock generated
View file

@ -3,14 +3,13 @@
version = 4 version = 4
[[package]] [[package]]
name = "async-channel" name = "async-lock"
version = "2.3.1" version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
dependencies = [ dependencies = [
"concurrent-queue", "event-listener",
"event-listener-strategy", "event-listener-strategy",
"futures-core",
"pin-project-lite", "pin-project-lite",
] ]
@ -126,7 +125,7 @@ checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
name = "wove" name = "wove"
version = "0.0.0" version = "0.0.0"
dependencies = [ dependencies = [
"async-channel", "async-lock",
"futures-lite", "futures-lite",
"io-uring", "io-uring",
"libc", "libc",

View file

@ -14,7 +14,7 @@ io-uring = { version = '0.7.1', optional = true }
libc = { version = "0.2.161", optional = true } libc = { version = "0.2.161", optional = true }
futures-lite = { workspace = true, optional = true } futures-lite = { workspace = true, optional = true }
parking = { version = "2.2.1", optional = true } parking = { version = "2.2.1", optional = true }
async-channel = { version = "2.3.1", optional = true } async-lock = { version = "3.4.0", optional = true }
[dev-dependencies] [dev-dependencies]
futures-lite = { workspace = true } futures-lite = { workspace = true }
@ -26,5 +26,5 @@ io_uring = [
"dep:libc", "dep:libc",
"dep:futures-lite", "dep:futures-lite",
"dep:parking", "dep:parking",
"dep:async-channel", "dep:async-lock",
] ]

View file

@ -1,11 +1,70 @@
use crate::aliases::IoResult;
use std::future::Future; use std::future::Future;
use crate::{aliases::IoResult, io_impl::IoImpl}; pub trait BufferMut {
fn as_mut_ptr(&mut self) -> *mut u8;
pub trait AsyncRead { fn writable_bytes(&self) -> usize;
fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = IoResult<usize>>;
} }
pub trait AsyncWrite { impl BufferMut for Vec<u8> {
fn write(&mut self, buf: &[u8]) -> impl Future<Output = IoResult<usize>>; fn as_mut_ptr(&mut self) -> *mut u8 {
self.as_mut_ptr()
}
fn writable_bytes(&self) -> usize {
self.len()
}
}
impl BufferMut for Box<[u8]> {
fn as_mut_ptr(&mut self) -> *mut u8 {
self[..].as_mut_ptr()
}
fn writable_bytes(&self) -> usize {
self.len()
}
}
pub trait AsyncReadLoan {
fn read<B: BufferMut>(&mut self, buf: B) -> impl Future<Output = (B, IoResult<usize>)>;
}
pub trait Buffer {
fn as_ptr(&self) -> *const u8;
fn readable_bytes(&self) -> usize;
}
impl Buffer for Vec<u8> {
fn as_ptr(&self) -> *const u8 {
self.as_ptr()
}
fn readable_bytes(&self) -> usize {
self.len()
}
}
impl Buffer for Box<[u8]> {
fn as_ptr(&self) -> *const u8 {
self[..].as_ptr()
}
fn readable_bytes(&self) -> usize {
self.len()
}
}
impl Buffer for &'static [u8] {
fn as_ptr(&self) -> *const u8 {
self[..].as_ptr()
}
fn readable_bytes(&self) -> usize {
self.len()
}
}
pub trait AsyncWriteLoan {
fn write<B: Buffer>(&mut self, buf: B) -> impl Future<Output = (B, IoResult<usize>)>;
} }

View file

@ -1,36 +1,78 @@
use std::{ use std::{
async_iter::AsyncIterator,
future::Future, future::Future,
net::SocketAddr, net::SocketAddr,
ops::Deref,
os::fd::{FromRawFd, IntoRawFd}, os::fd::{FromRawFd, IntoRawFd},
pin::Pin, pin::Pin,
sync::atomic::{AtomicU64, Ordering}, rc::Rc,
sync::atomic::{AtomicI32, AtomicU64, Ordering},
task::{Context, Poll, Waker}, task::{Context, Poll, Waker},
}; };
use futures_lite::Stream; use async_lock::{futures::BarrierWait, Barrier};
use futures_lite::{pin, Stream};
use parking::Parker; use parking::Parker;
use crate::aliases::IoResult; use crate::{
aliases::IoResult,
io::{AsyncReadLoan, AsyncWriteLoan},
};
use super::IoImpl; use super::IoImpl;
type CqueueEntryReceiver = async_channel::Receiver<io_uring::cqueue::Entry>; #[derive(Debug)]
type CqueueEntrySender = async_channel::Sender<io_uring::cqueue::Entry>; struct ResultBarrier {
pub struct UserData { result: AtomicI32,
tx: CqueueEntrySender, barrier: Barrier,
persist: bool,
} }
impl UserData { impl ResultBarrier {
fn new_boxed(tx: CqueueEntrySender, persist: bool) -> Box<Self> { fn new() -> Self {
Box::new(Self { tx, persist }) Self {
result: AtomicI32::new(0),
barrier: Barrier::new(2),
}
}
async fn wait(&self) {
self.barrier.wait().await;
}
fn result(&self) -> i32 {
self.result.load(Ordering::Relaxed)
}
fn set_result_and_block(&self, v: i32) {
self.result.store(v, Ordering::Relaxed);
self.barrier.wait_blocking();
}
async fn wait_result(&self) -> i32 {
self.wait().await;
self.result()
}
}
#[derive(Debug)]
pub struct UserData<'a> {
persist: bool,
rb: &'a ResultBarrier,
}
impl<'a> UserData<'a> {
fn new_boxed(rb: &'a ResultBarrier, persist: bool) -> Box<Self> {
Box::new(Self { rb, persist })
}
fn new_into_u64(rb: &'a ResultBarrier, persist: bool) -> u64 {
Self::new_boxed(rb, persist).into_u64()
} }
fn into_u64(self: Box<Self>) -> u64 { fn into_u64(self: Box<Self>) -> u64 {
Box::leak(self) as *mut _ as _ Box::leak(self) as *mut _ as _
} }
unsafe fn from_u64(v: u64) -> Box<UserData> { unsafe fn from_u64(v: u64) -> Box<UserData<'a>> {
let v = v as *mut UserData; let v = v as *mut UserData;
unsafe { Box::from_raw(v) } unsafe { Box::from_raw(v) }
@ -51,7 +93,22 @@ pub struct Tick {
active_completions: usize, active_completions: usize,
} }
pub struct IoUring { #[derive(Clone)]
pub struct IoUring(Rc<IoUringInner>);
impl IoUring {
pub fn new() -> IoResult<Self> {
Ok(Self(Rc::new(IoUringInner::new()?)))
}
}
impl Deref for IoUring {
type Target = IoUringInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct IoUringInner {
uring: io_uring::IoUring, uring: io_uring::IoUring,
// TODO: analyze the atomic orderings passed to the atomic operations here // TODO: analyze the atomic orderings passed to the atomic operations here
@ -63,7 +120,7 @@ pub struct IoUring {
_pd: core::marker::PhantomData<std::cell::RefCell<()>>, _pd: core::marker::PhantomData<std::cell::RefCell<()>>,
} }
impl IoUring { impl IoUringInner {
pub fn new() -> IoResult<Self> { pub fn new() -> IoResult<Self> {
let uring = io_uring::IoUring::new(256)?; let uring = io_uring::IoUring::new(256)?;
@ -75,6 +132,33 @@ impl IoUring {
}) })
} }
/// Cancel all events for the given fd. Does not return anything, and
/// cancellations are made on a best-effort basis
fn cancel_fd(&self, fd: io_uring::types::Fd) {
let rb = ResultBarrier::new();
let entry =
io_uring::opcode::AsyncCancel2::new(io_uring::types::CancelBuilder::fd(fd).all())
.build()
.user_data(UserData::new_into_u64(&rb, false));
self.queue_op(entry);
}
fn queue_op(&self, op: io_uring::squeue::Entry) {
unsafe { self.uring.submission_shared().push(&op).unwrap() }
}
async fn wait_op(&self, op: io_uring::squeue::Entry) -> IoResult<i32> {
let rb = ResultBarrier::new();
let entry = op.user_data(UserData::new_into_u64(&rb, false));
self.queue_op(entry);
handle_error(rb.wait_result().await)
}
pub fn submit(&self, wait_for: usize) -> IoResult<usize> { pub fn submit(&self, wait_for: usize) -> IoResult<usize> {
let submitted_count = self.uring.submit_and_wait(wait_for)?; let submitted_count = self.uring.submit_and_wait(wait_for)?;
self.active_completions self.active_completions
@ -102,7 +186,7 @@ impl IoUring {
} }
let ud = unsafe { UserData::from_u64(entry.user_data()) }; let ud = unsafe { UserData::from_u64(entry.user_data()) };
ud.tx.send_blocking(entry).unwrap(); ud.rb.set_result_and_block(entry.result());
if ud.persist { if ud.persist {
Box::leak(ud); Box::leak(ud);
} else { } else {
@ -189,33 +273,69 @@ impl Drop for TcpListener {
} }
} }
pub struct TcpStream(io_uring::types::Fd); pub struct TcpStream {
uring: IoUring,
fd: io_uring::types::Fd,
}
impl Drop for TcpStream { impl Drop for TcpStream {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { std::net::TcpStream::from_raw_fd(self.0 .0) }; self.uring.cancel_fd(self.fd);
unsafe { std::net::TcpStream::from_raw_fd(self.fd.0) };
}
}
impl AsyncReadLoan for TcpStream {
async fn read<B: crate::io::BufferMut>(&mut self, mut buf: B) -> (B, IoResult<usize>) {
let res = self
.uring
.0
.wait_op(
io_uring::opcode::Read::new(
self.fd,
buf.as_mut_ptr(),
buf.writable_bytes() as u32,
)
.build(),
)
.await
.map(|v| v as usize);
(buf, res)
}
}
impl AsyncWriteLoan for TcpStream {
async fn write<B: crate::io::Buffer>(&mut self, buf: B) -> (B, IoResult<usize>) {
let res = self
.uring
.0
.wait_op(
io_uring::opcode::Write::new(
self.fd,
buf.as_ptr(),
buf.readable_bytes() as u32,
)
.build(),
)
.await
.map(|v| v as usize);
(buf, res)
} }
} }
impl IoImpl for IoUring { impl IoImpl for IoUring {
async fn sleep(&self, duration: std::time::Duration) -> IoResult<()> { async fn sleep(&self, duration: std::time::Duration) -> IoResult<()> {
let (tx, rx) = async_channel::bounded(1);
let ts = io_uring::types::Timespec::new() let ts = io_uring::types::Timespec::new()
.sec(duration.as_secs()) .sec(duration.as_secs())
.nsec(duration.subsec_nanos()); .nsec(duration.subsec_nanos());
let entry = io_uring::opcode::Timeout::new(&ts as *const _) let entry = io_uring::opcode::Timeout::new(&ts as *const _).build();
.build()
.user_data(UserData::new_boxed(tx, false).into_u64());
unsafe { self.uring.submission_shared().push(&entry).unwrap() } let _ = self.0.wait_op(entry).await;
let entry = rx.recv().await.unwrap();
handle_error(entry.result())?;
Ok(()) Ok(())
} }
type TcpListener = io_uring::types::Fd; type TcpListener = TcpListener;
fn open_tcp_socket( fn open_tcp_socket(
&self, &self,
addr: std::net::SocketAddr, addr: std::net::SocketAddr,
@ -226,7 +346,7 @@ impl IoImpl for IoUring {
// the std implementation and cast it to a FileHandle // the std implementation and cast it to a FileHandle
let listener = std::net::TcpListener::bind(addr); let listener = std::net::TcpListener::bind(addr);
async { IoResult::Ok(io_uring::types::Fd(listener?.into_raw_fd())) } async { IoResult::Ok(TcpListener(io_uring::types::Fd(listener?.into_raw_fd()))) }
/* /*
// let (tx, rx) = async_channel::bounded(1); // let (tx, rx) = async_channel::bounded(1);
@ -279,7 +399,7 @@ impl IoImpl for IoUring {
async fn listener_local_addr(&self, listener: &Self::TcpListener) -> IoResult<SocketAddr> { async fn listener_local_addr(&self, listener: &Self::TcpListener) -> IoResult<SocketAddr> {
// FIXME(Blocking) // FIXME(Blocking)
let listener = unsafe { std::net::TcpListener::from_raw_fd(listener.0) }; let listener = unsafe { std::net::TcpListener::from_raw_fd(listener.0 .0) };
let addr = listener.local_addr()?; let addr = listener.local_addr()?;
let _ = listener.into_raw_fd(); let _ = listener.into_raw_fd();
@ -291,45 +411,32 @@ impl IoImpl for IoUring {
&self, &self,
listener: &mut Self::TcpListener, listener: &mut Self::TcpListener,
) -> impl Future<Output = IoResult<Self::Incoming>> { ) -> impl Future<Output = IoResult<Self::Incoming>> {
let (tx, rx) = async_channel::unbounded(); let rb = Box::pin(ResultBarrier::new());
let cb_id = UserData::new_boxed(tx, true).into_u64(); let cb_id = UserData::new_into_u64(&rb, true);
let entry = io_uring::opcode::AcceptMulti::new(*listener) let entry = io_uring::opcode::AcceptMulti::new(listener.0)
.build() .build()
.user_data(cb_id); .user_data(cb_id);
unsafe { unsafe {
self.uring.submission_shared().push(&entry).unwrap(); self.0.uring.submission_shared().push(&entry).unwrap();
} }
let wait = unsafe {
core::mem::transmute::<BarrierWait<'_>, BarrierWait<'_>>(rb.barrier.wait())
};
async move { async move {
Ok(Incoming { Ok(Incoming {
io: self as *const _, uring: self.clone(),
rx: Box::pin(rx), rb,
cb_id, wait,
fd: listener.0,
}) })
} }
} }
type TcpStream = TcpStream; type TcpStream = TcpStream;
async fn tcp_read(&self, stream: &mut Self::TcpStream, buf: &mut [u8]) -> IoResult<usize> {
let (tx, rx) = async_channel::bounded(1);
let entry = io_uring::opcode::Read::new(stream.0, buf.as_mut_ptr(), buf.len() as u32)
.build()
.user_data(UserData::new_boxed(tx, false).into_u64());
unsafe {
self.uring.submission_shared().push(&entry).unwrap();
}
let entry = rx.recv().await.unwrap();
let read_amt = handle_error(entry.result())?;
Ok(read_amt as usize)
}
fn tcp_connect( fn tcp_connect(
&self, &self,
socket: SocketAddr, socket: SocketAddr,
@ -337,78 +444,50 @@ impl IoImpl for IoUring {
// FIXME(Blocking) // FIXME(Blocking)
let stream = std::net::TcpStream::connect(socket); let stream = std::net::TcpStream::connect(socket);
async { Ok(TcpStream(io_uring::types::Fd(stream?.into_raw_fd()))) } async {
Ok(TcpStream {
uring: self.clone(),
fd: io_uring::types::Fd(stream?.into_raw_fd()),
})
} }
async fn tcp_write(&self, stream: &mut Self::TcpStream, buf: &[u8]) -> IoResult<usize> {
let (tx, rx) = async_channel::bounded(1);
let entry = io_uring::opcode::Write::new(stream.0, buf.as_ptr(), buf.len() as u32)
.build()
.user_data(UserData::new_boxed(tx, false).into_u64());
unsafe {
self.uring.submission_shared().push(&entry).unwrap();
}
let entry = rx.recv().await.unwrap();
let write_amt = handle_error(entry.result())?;
Ok(write_amt as usize)
} }
} }
pub struct Incoming { pub struct Incoming {
io: *const IoUring, uring: IoUring,
rx: Pin<Box<CqueueEntryReceiver>>, rb: Pin<Box<ResultBarrier>>,
cb_id: u64, wait: BarrierWait<'static>,
} fd: io_uring::types::Fd,
pub fn cancel(io: &IoUring, id: u64) {
let entry = io_uring::opcode::AsyncCancel::new(id).build().user_data(0);
unsafe {
io.uring.submission_shared().push(&entry).unwrap();
}
}
impl AsyncIterator for Incoming {
type Item = IoResult<TcpStream>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let rx = unsafe { self.map_unchecked_mut(|s| &mut s.rx) };
let mut fut = rx.recv();
let pinned = unsafe { Pin::new_unchecked(&mut fut) };
pinned
.poll(cx)
.map(|entry| {
let fd = handle_error(entry.unwrap().result())?;
Ok(TcpStream(io_uring::types::Fd(fd)))
})
.map(Some)
} }
impl Unpin for Incoming {
} }
impl Stream for Incoming { impl Stream for Incoming {
type Item = IoResult<TcpStream>; type Item = IoResult<TcpStream>;
fn poll_next( fn poll_next(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> { ) -> std::task::Poll<Option<Self::Item>> {
AsyncIterator::poll_next(self, cx) let fut = unsafe { self.as_mut().map_unchecked_mut(|s| &mut s.wait) };
pin!(fut);
fut.poll(cx)
.map(|_| {
let fd = handle_error(self.rb.result())?;
Ok(TcpStream {
uring: self.uring.clone(),
fd: io_uring::types::Fd(fd),
})
})
.map(Some)
} }
} }
impl Drop for Incoming { impl Drop for Incoming {
fn drop(&mut self) { fn drop(&mut self) {
let io = unsafe { &*self.io }; self.uring.0.cancel_fd(self.fd);
cancel(io, self.cb_id);
} }
} }
@ -433,7 +512,7 @@ pub enum ActiveRequestBehavior {
Pending, Pending,
} }
pub struct Run<'a, F: Future>(&'a IoUring, F, ActiveRequestBehavior); pub struct Run<'a, F: Future>(&'a IoUringInner, F, ActiveRequestBehavior);
impl<F: Future> Future for Run<'_, F> { impl<F: Future> Future for Run<'_, F> {
type Output = F::Output; type Output = F::Output;
@ -496,7 +575,7 @@ mod test {
#[test] #[test]
fn simple() { fn simple() {
let uring = IoUring::new().unwrap(); let uring = &IoUring::new().unwrap();
let out = uring.block_on(async { 5 }); let out = uring.block_on(async { 5 });
assert_eq!(out, 5); assert_eq!(out, 5);
} }
@ -505,7 +584,9 @@ mod test {
fn sleep() { fn sleep() {
let uring = &IoUring::new().unwrap(); let uring = &IoUring::new().unwrap();
let out = uring.block_on(async { let out = uring.block_on(async {
crate::time::sleep(uring, Duration::from_secs(1)).await; crate::time::sleep(uring, Duration::from_secs(1))
.await
.unwrap();
5 5
}); });
@ -527,7 +608,8 @@ mod test {
crate::time::sleep(uring, Duration::from_secs(1)), crate::time::sleep(uring, Duration::from_secs(1)),
ActiveRequestBehavior::Block, ActiveRequestBehavior::Block,
) )
.await; .await
.unwrap();
5 5
}); });
@ -541,7 +623,7 @@ mod test {
use crate::{ use crate::{
aliases::IoResult, aliases::IoResult,
io::{AsyncRead, AsyncWrite}, io::{AsyncReadLoan, AsyncWriteLoan},
io_impl::io_uring::IoUring, io_impl::io_uring::IoUring,
}; };
use futures_lite::StreamExt; use futures_lite::StreamExt;
@ -549,14 +631,14 @@ mod test {
async fn start_echo( async fn start_echo(
uring: &IoUring, uring: &IoUring,
) -> IoResult<(SocketAddr, impl Future<Output = IoResult<Box<[u8]>>> + '_)> { ) -> IoResult<(SocketAddr, impl Future<Output = IoResult<Box<[u8]>>> + '_)> {
let listener = crate::net::TcpListener::bind(uring, "127.0.0.1:0").await?; let mut listener = crate::net::TcpListener::bind(uring, "127.0.0.1:0").await?;
Ok((listener.local_addr().await?, async { Ok((listener.local_addr().await?, async move {
let mut incoming = listener.incoming().await?; let mut incoming = listener.incoming().await?;
let mut stream = incoming.next().await.unwrap()?; let mut stream = incoming.next().await.unwrap()?;
let mut data = vec![0; 4096]; let (mut data, read_amt) = stream.read(vec![0; 4096]).await;
let read_amt = stream.read(&mut data).await?; let read_amt = read_amt?;
data.truncate(read_amt); data.truncate(read_amt);
Ok(data.into_boxed_slice()) Ok(data.into_boxed_slice())
@ -573,7 +655,8 @@ mod test {
let (addr, read_data) = start_echo(uring).await?; let (addr, read_data) = start_echo(uring).await?;
let mut conn = crate::net::TcpStream::connect(uring, addr).await?; let mut conn = crate::net::TcpStream::connect(uring, addr).await?;
conn.write(input).await?; let (_, res) = conn.write(input).await;
res?;
let data = read_data.await?; let data = read_data.await?;

View file

@ -1,4 +1,7 @@
use crate::aliases::IoResult; use crate::{
aliases::IoResult,
io::{AsyncReadLoan, AsyncWriteLoan},
};
use std::{future::Future, net::SocketAddr, time::Duration}; use std::{future::Future, net::SocketAddr, time::Duration};
#[cfg(feature = "io_uring")] #[cfg(feature = "io_uring")]
@ -23,17 +26,7 @@ pub trait IoImpl {
listener: &mut Self::TcpListener, listener: &mut Self::TcpListener,
) -> impl Future<Output = IoResult<Self::Incoming>>; ) -> impl Future<Output = IoResult<Self::Incoming>>;
type TcpStream; type TcpStream: AsyncReadLoan + AsyncWriteLoan;
fn tcp_read(
&self,
stream: &mut Self::TcpStream,
buf: &mut [u8],
) -> impl Future<Output = IoResult<usize>>;
fn tcp_write(
&self,
stream: &mut Self::TcpStream,
buf: &[u8],
) -> impl Future<Output = IoResult<usize>>;
fn tcp_connect( fn tcp_connect(
&self, &self,
socket: SocketAddr, socket: SocketAddr,

View file

@ -1,7 +1,3 @@
#![feature(async_closure)]
#![feature(async_iterator)]
#![feature(never_type)]
mod aliases; mod aliases;
// pub mod fs; // pub mod fs;
pub mod io; pub mod io;

View file

@ -1,5 +1,4 @@
use std::{ use std::{
async_iter::AsyncIterator,
net::{SocketAddr, ToSocketAddrs}, net::{SocketAddr, ToSocketAddrs},
pin::Pin, pin::Pin,
}; };
@ -8,26 +7,26 @@ use futures_lite::Stream;
use crate::{ use crate::{
aliases::IoResult, aliases::IoResult,
io::{AsyncRead, AsyncWrite}, io::{AsyncReadLoan, AsyncWriteLoan, Buffer, BufferMut},
io_impl::IoImpl, io_impl::IoImpl,
}; };
pub struct TcpStream<'a, I: IoImpl>(&'a I, I::TcpStream); pub struct TcpStream<I: IoImpl>(I::TcpStream);
impl<'a, I: IoImpl> TcpStream<'a, I> { impl<I: IoImpl> TcpStream<I> {
pub async fn connect(io: &'a I, addr: SocketAddr) -> IoResult<Self> { pub async fn connect(io: &I, addr: SocketAddr) -> IoResult<Self> {
Ok(Self(io, io.tcp_connect(addr).await?)) Ok(Self(io.tcp_connect(addr).await?))
} }
} }
impl<'a, I: IoImpl> AsyncRead for TcpStream<'a, I> { impl<I: IoImpl> AsyncReadLoan for TcpStream<I> {
async fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { async fn read<B: BufferMut>(&mut self, buf: B) -> (B, IoResult<usize>) {
self.0.tcp_read(&mut self.1, buf).await self.0.read(buf).await
} }
} }
impl<'a, I: IoImpl> AsyncWrite for TcpStream<'a, I> { impl<I: IoImpl> AsyncWriteLoan for TcpStream<I> {
async fn write(&mut self, buf: &[u8]) -> IoResult<usize> { async fn write<B: Buffer>(&mut self, buf: B) -> (B, IoResult<usize>) {
self.0.tcp_write(&mut self.1, buf).await self.0.write(buf).await
} }
} }
@ -56,43 +55,25 @@ impl<'a, I: IoImpl> TcpListener<'a, I> {
self.0.listener_local_addr(&self.1).await self.0.listener_local_addr(&self.1).await
} }
pub async fn incoming(mut self) -> IoResult<Incoming<'a, I>> { pub async fn incoming(&mut self) -> IoResult<Incoming<I>> {
Ok(Incoming(self.0, self.0.accept_many(&mut self.1).await?)) Ok(Incoming(self.0.accept_many(&mut self.1).await?))
} }
} }
pub struct Incoming<'a, I: IoImpl>(&'a I, I::Incoming); pub struct Incoming<I: IoImpl>(I::Incoming);
impl<'a, I: IoImpl> AsyncIterator for Incoming<'a, I> impl<I: IoImpl> Stream for Incoming<I>
where
I::Incoming: AsyncIterator<Item = IoResult<I::TcpStream>>,
{
type Item = IoResult<TcpStream<'a, I>>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let io = self.0;
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.1) };
AsyncIterator::poll_next(inner, cx).map(|o| o.map(|r| r.map(|i| TcpStream(io, i))))
}
}
impl<'a, I: IoImpl> Stream for Incoming<'a, I>
where where
I::Incoming: Stream<Item = IoResult<I::TcpStream>>, I::Incoming: Stream<Item = IoResult<I::TcpStream>>,
{ {
type Item = IoResult<TcpStream<'a, I>>; type Item = IoResult<TcpStream<I>>;
fn poll_next( fn poll_next(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> { ) -> std::task::Poll<Option<Self::Item>> {
let io = self.0; let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.1) };
Stream::poll_next(inner, cx).map(|o| o.map(|r| r.map(|i| TcpStream(io, i)))) Stream::poll_next(inner, cx).map(|o| o.map(|r| r.map(|i| TcpStream(i))))
} }
} }

View file

@ -1,7 +1,7 @@
use std::time::Duration; use std::time::Duration;
use crate::io_impl::IoImpl; use crate::{aliases::IoResult, io_impl::IoImpl};
pub async fn sleep<I: IoImpl>(io: &I, duration: Duration) { pub async fn sleep<I: IoImpl>(io: &I, duration: Duration) -> IoResult<()> {
let _ = io.sleep(duration).await; io.sleep(duration).await
} }