From 4997d1f343be20939516683462e06f494b970d76 Mon Sep 17 00:00:00 2001 From: soup Date: Wed, 23 Oct 2024 00:47:56 -0400 Subject: [PATCH] Make value returned by Async(Read|Write)Loan easier to work with --- examples/tcp_echo.rs | 85 ++++++++----------------------------- src/io.rs | 11 +++++ src/io_impl/io_uring/mod.rs | 11 ++--- 3 files changed, 32 insertions(+), 75 deletions(-) diff --git a/examples/tcp_echo.rs b/examples/tcp_echo.rs index fa6a11c..108ad9d 100644 --- a/examples/tcp_echo.rs +++ b/examples/tcp_echo.rs @@ -1,76 +1,25 @@ -/* - -use std::cell::RefCell; - -use futures_lite::{FutureExt, StreamExt as FlStreamExt}; - -use futures_util::{ - select, stream::FuturesUnordered, FutureExt as FuFutureExt, StreamExt as FuStreamExt, +use futures_lite::StreamExt; +use wove::{ + io::{AsyncReadLoan, AsyncWriteLoan, BufferResultExt}, + io_impl::io_uring::IoUring, }; -use wove::{io::AsyncRead, io::AsyncWrite, Wove}; -pub async fn go(wove: &Wove) -> std::io::Result<()> { - let mut listener = wove::net::TcpListener::bind(wove, "127.0.0.1:4838").await?; - let mut incoming = listener.incoming(wove); +pub async fn go(uring: &IoUring) -> std::io::Result<()> { + let mut listener = wove::net::TcpListener::bind(uring, "127.0.0.1:0").await?; + let addr = listener.local_addr().await?; + println!("Listening on {addr}"); + let mut incoming = listener.incoming().await?; - let connections = RefCell::new(FuturesUnordered::new()); - - let mut accept = Box::pin( - async { - while let Some(connection) = FlStreamExt::next(&mut incoming).await { - let mut connection = connection?; - connections.borrow_mut().push(async move { - loop { - let data = connection - .read(wove, vec![0; 4096].into_boxed_slice()) - .await?; - if data.len() == 0 { - break; - } - - connection.write(wove, data).await?; - } - - std::io::Result::Ok(()) - }); - } - - std::io::Result::Ok(()) - } - .fuse(), - ); - - let mut handle = Box::pin( - async { - while let Some(out) = FuStreamExt::next(&mut *connections.borrow_mut()).await { - out?; - } - - std::io::Result::Ok(()) - } - .fuse(), - ); - - loop { - select! { - res = handle => res?, - res = accept => res?, - } + while let Some(conn) = incoming.next().await { + let mut conn = conn?; + let (buf, _read_amt) = conn.read(vec![0; 4096]).await.buf_ok()?; + conn.write(buf).await.buf_ok()?; } + + Ok(()) } pub fn main() { - let wove = Wove::new().unwrap(); - let fut = async { - let run = async { wove.run().await? }; - let go = go(&wove); - - go.race(run).await - }; - - pollster::block_on(fut).unwrap(); -} -*/ - -pub fn main() { + let uring = IoUring::new().unwrap(); + uring.block_on(go(&uring)).unwrap(); } diff --git a/src/io.rs b/src/io.rs index c85a884..af510b9 100644 --- a/src/io.rs +++ b/src/io.rs @@ -3,6 +3,17 @@ use futures_lite::Stream; use crate::aliases::IoResult; use std::{future::Future, mem::MaybeUninit}; +pub trait BufferResultExt { + fn buf_ok(self) -> IoResult<(B, T)>; +} +impl BufferResultExt for (B, IoResult) { + fn buf_ok(self) -> IoResult<(B, T)> { + let (buf, res) = self; + + res.map(|v| (buf, v)) + } +} + pub trait BufferMut { fn as_mut_ptr(&mut self) -> *mut u8; fn writable_bytes(&self) -> usize; diff --git a/src/io_impl/io_uring/mod.rs b/src/io_impl/io_uring/mod.rs index 78310b8..dfa9993 100644 --- a/src/io_impl/io_uring/mod.rs +++ b/src/io_impl/io_uring/mod.rs @@ -600,7 +600,7 @@ mod test { use crate::{ aliases::IoResult, - io::{AsyncReadLoan, AsyncWriteLoan}, + io::{AsyncReadLoan, AsyncWriteLoan, BufferResultExt}, io_impl::io_uring::IoUring, }; use futures_lite::StreamExt; @@ -632,8 +632,7 @@ mod test { let (addr, read_data) = start_echo(uring).await?; let mut conn = crate::net::TcpStream::connect(uring, addr).await?; - let (_, res) = conn.write(input).await; - res?; + conn.write(input).await.buf_ok()?; let data = read_data.await?; @@ -656,13 +655,11 @@ mod test { let write_data = async { let mut stream = listener.incoming().await?.next().await.unwrap()?; - let (_, res) = stream.write("Hello".as_bytes()).await; - res?; + stream.write("Hello".as_bytes()).await.buf_ok()?; crate::time::sleep(uring, Duration::from_millis(500)).await?; - let (_, res) = stream.write(", world".as_bytes()).await; - res?; + stream.write(", world".as_bytes()).await.buf_ok()?; IoResult::Ok(()) };