Make value returned by Async(Read|Write)Loan easier to work with
This commit is contained in:
parent
5afba95f02
commit
4997d1f343
|
|
@ -1,76 +1,25 @@
|
||||||
/*
|
use futures_lite::StreamExt;
|
||||||
|
use wove::{
|
||||||
use std::cell::RefCell;
|
io::{AsyncReadLoan, AsyncWriteLoan, BufferResultExt},
|
||||||
|
io_impl::io_uring::IoUring,
|
||||||
use futures_lite::{FutureExt, StreamExt as FlStreamExt};
|
|
||||||
|
|
||||||
use futures_util::{
|
|
||||||
select, stream::FuturesUnordered, FutureExt as FuFutureExt, StreamExt as FuStreamExt,
|
|
||||||
};
|
};
|
||||||
use wove::{io::AsyncRead, io::AsyncWrite, Wove};
|
|
||||||
|
|
||||||
pub async fn go(wove: &Wove) -> std::io::Result<()> {
|
pub async fn go(uring: &IoUring) -> std::io::Result<()> {
|
||||||
let mut listener = wove::net::TcpListener::bind(wove, "127.0.0.1:4838").await?;
|
let mut listener = wove::net::TcpListener::bind(uring, "127.0.0.1:0").await?;
|
||||||
let mut incoming = listener.incoming(wove);
|
let addr = listener.local_addr().await?;
|
||||||
|
println!("Listening on {addr}");
|
||||||
|
let mut incoming = listener.incoming().await?;
|
||||||
|
|
||||||
let connections = RefCell::new(FuturesUnordered::new());
|
while let Some(conn) = incoming.next().await {
|
||||||
|
let mut conn = conn?;
|
||||||
let mut accept = Box::pin(
|
let (buf, _read_amt) = conn.read(vec![0; 4096]).await.buf_ok()?;
|
||||||
async {
|
conn.write(buf).await.buf_ok()?;
|
||||||
while let Some(connection) = FlStreamExt::next(&mut incoming).await {
|
|
||||||
let mut connection = connection?;
|
|
||||||
connections.borrow_mut().push(async move {
|
|
||||||
loop {
|
|
||||||
let data = connection
|
|
||||||
.read(wove, vec![0; 4096].into_boxed_slice())
|
|
||||||
.await?;
|
|
||||||
if data.len() == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
connection.write(wove, data).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::io::Result::Ok(())
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
std::io::Result::Ok(())
|
|
||||||
}
|
|
||||||
.fuse(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut handle = Box::pin(
|
|
||||||
async {
|
|
||||||
while let Some(out) = FuStreamExt::next(&mut *connections.borrow_mut()).await {
|
|
||||||
out?;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::io::Result::Ok(())
|
|
||||||
}
|
|
||||||
.fuse(),
|
|
||||||
);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
select! {
|
|
||||||
res = handle => res?,
|
|
||||||
res = accept => res?,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn main() {
|
pub fn main() {
|
||||||
let wove = Wove::new().unwrap();
|
let uring = IoUring::new().unwrap();
|
||||||
let fut = async {
|
uring.block_on(go(&uring)).unwrap();
|
||||||
let run = async { wove.run().await? };
|
|
||||||
let go = go(&wove);
|
|
||||||
|
|
||||||
go.race(run).await
|
|
||||||
};
|
|
||||||
|
|
||||||
pollster::block_on(fut).unwrap();
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
pub fn main() {
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
11
src/io.rs
11
src/io.rs
|
|
@ -3,6 +3,17 @@ use futures_lite::Stream;
|
||||||
use crate::aliases::IoResult;
|
use crate::aliases::IoResult;
|
||||||
use std::{future::Future, mem::MaybeUninit};
|
use std::{future::Future, mem::MaybeUninit};
|
||||||
|
|
||||||
|
pub trait BufferResultExt<B, T> {
|
||||||
|
fn buf_ok(self) -> IoResult<(B, T)>;
|
||||||
|
}
|
||||||
|
impl<B, T> BufferResultExt<B, T> for (B, IoResult<T>) {
|
||||||
|
fn buf_ok(self) -> IoResult<(B, T)> {
|
||||||
|
let (buf, res) = self;
|
||||||
|
|
||||||
|
res.map(|v| (buf, v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub trait BufferMut {
|
pub trait BufferMut {
|
||||||
fn as_mut_ptr(&mut self) -> *mut u8;
|
fn as_mut_ptr(&mut self) -> *mut u8;
|
||||||
fn writable_bytes(&self) -> usize;
|
fn writable_bytes(&self) -> usize;
|
||||||
|
|
|
||||||
|
|
@ -600,7 +600,7 @@ mod test {
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
aliases::IoResult,
|
aliases::IoResult,
|
||||||
io::{AsyncReadLoan, AsyncWriteLoan},
|
io::{AsyncReadLoan, AsyncWriteLoan, BufferResultExt},
|
||||||
io_impl::io_uring::IoUring,
|
io_impl::io_uring::IoUring,
|
||||||
};
|
};
|
||||||
use futures_lite::StreamExt;
|
use futures_lite::StreamExt;
|
||||||
|
|
@ -632,8 +632,7 @@ mod test {
|
||||||
let (addr, read_data) = start_echo(uring).await?;
|
let (addr, read_data) = start_echo(uring).await?;
|
||||||
let mut conn = crate::net::TcpStream::connect(uring, addr).await?;
|
let mut conn = crate::net::TcpStream::connect(uring, addr).await?;
|
||||||
|
|
||||||
let (_, res) = conn.write(input).await;
|
conn.write(input).await.buf_ok()?;
|
||||||
res?;
|
|
||||||
|
|
||||||
let data = read_data.await?;
|
let data = read_data.await?;
|
||||||
|
|
||||||
|
|
@ -656,13 +655,11 @@ mod test {
|
||||||
|
|
||||||
let write_data = async {
|
let write_data = async {
|
||||||
let mut stream = listener.incoming().await?.next().await.unwrap()?;
|
let mut stream = listener.incoming().await?.next().await.unwrap()?;
|
||||||
let (_, res) = stream.write("Hello".as_bytes()).await;
|
stream.write("Hello".as_bytes()).await.buf_ok()?;
|
||||||
res?;
|
|
||||||
|
|
||||||
crate::time::sleep(uring, Duration::from_millis(500)).await?;
|
crate::time::sleep(uring, Duration::from_millis(500)).await?;
|
||||||
|
|
||||||
let (_, res) = stream.write(", world".as_bytes()).await;
|
stream.write(", world".as_bytes()).await.buf_ok()?;
|
||||||
res?;
|
|
||||||
|
|
||||||
IoResult::Ok(())
|
IoResult::Ok(())
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue