Working TCP echo
This commit is contained in:
parent
4997d1f343
commit
54c25d5580
|
|
@ -1,20 +1,42 @@
|
||||||
use futures_lite::StreamExt;
|
use futures_lite::{future::zip, StreamExt};
|
||||||
use wove::{
|
use wove::{
|
||||||
io::{AsyncReadLoan, AsyncWriteLoan, BufferResultExt},
|
futures::Nexus,
|
||||||
|
io::{AsyncReadLoan, AsyncWriteLoan, Transpose},
|
||||||
io_impl::io_uring::IoUring,
|
io_impl::io_uring::IoUring,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn go(uring: &IoUring) -> std::io::Result<()> {
|
pub async fn go(uring: &IoUring) -> std::io::Result<()> {
|
||||||
let mut listener = wove::net::TcpListener::bind(uring, "127.0.0.1:0").await?;
|
let mut listener = wove::net::TcpListener::bind(uring, "127.0.0.1:0").await?;
|
||||||
let addr = listener.local_addr().await?;
|
let addr = listener.local_addr().await?;
|
||||||
println!("Listening on {addr}");
|
println!("{addr}");
|
||||||
let mut incoming = listener.incoming().await?;
|
let mut incoming = listener.incoming().await?;
|
||||||
|
|
||||||
|
let handlers = Nexus::new();
|
||||||
|
|
||||||
|
let accept = async {
|
||||||
while let Some(conn) = incoming.next().await {
|
while let Some(conn) = incoming.next().await {
|
||||||
let mut conn = conn?;
|
let mut conn = conn?;
|
||||||
let (buf, _read_amt) = conn.read(vec![0; 4096]).await.buf_ok()?;
|
|
||||||
conn.write(buf).await.buf_ok()?;
|
handlers.push(Box::pin(async move {
|
||||||
|
loop {
|
||||||
|
let (buf, read_amt) = conn.read(vec![0; 4096]).await.transpose()?;
|
||||||
|
if read_amt == 0 {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
conn.write(buf).await.transpose()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::io::Result::Ok(())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
std::io::Result::Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
let handle_connections = handlers.stream().fuse().last();
|
||||||
|
let (accept_result, handle_result) = zip(accept, handle_connections).await;
|
||||||
|
accept_result?;
|
||||||
|
handle_result.transpose()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,12 @@
|
||||||
use std::{
|
use std::{
|
||||||
cell::RefCell,
|
cell::{RefCell, UnsafeCell},
|
||||||
future::Future,
|
future::Future,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
rc::Rc,
|
rc::Rc,
|
||||||
task::{Context, Poll, Waker},
|
task::{Context, Poll, Waker},
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures_lite::Stream;
|
use futures_lite::{FutureExt, Stream};
|
||||||
|
|
||||||
type Slot<T> = Rc<RefCell<Option<T>>>;
|
type Slot<T> = Rc<RefCell<Option<T>>>;
|
||||||
|
|
||||||
|
|
@ -117,3 +117,52 @@ pub fn handoff<T>() -> (Put<T>, Get<T>) {
|
||||||
|
|
||||||
(put, get)
|
(put, get)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct Nexus<Fut>(UnsafeCell<Vec<Fut>>);
|
||||||
|
impl<Fut> Nexus<Fut> {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self(UnsafeCell::new(Vec::new()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push(&self, fut: Fut) {
|
||||||
|
unsafe { &mut *self.0.get() }.push(fut);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Fut> Default for Nexus<Fut> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Fut: Future> Nexus<Fut> {
|
||||||
|
pub fn stream(&self) -> NexusStream<'_, Fut> {
|
||||||
|
NexusStream(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct NexusStream<'a, Fut>(&'a Nexus<Fut>);
|
||||||
|
impl<Fut: Future + Unpin> Stream for NexusStream<'_, Fut> {
|
||||||
|
type Item = Fut::Output;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
let futures = unsafe { &mut *self.0 .0.get() };
|
||||||
|
|
||||||
|
let out = futures.iter_mut().enumerate().find_map(|(idx, fut)| {
|
||||||
|
if let Poll::Ready(v) = fut.poll(cx) {
|
||||||
|
return Some((idx, v));
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
});
|
||||||
|
|
||||||
|
match out {
|
||||||
|
None => Poll::Pending,
|
||||||
|
Some((idx, v)) => {
|
||||||
|
futures.swap_remove(idx);
|
||||||
|
|
||||||
|
Poll::Ready(Some(v))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,11 @@ use futures_lite::Stream;
|
||||||
use crate::aliases::IoResult;
|
use crate::aliases::IoResult;
|
||||||
use std::{future::Future, mem::MaybeUninit};
|
use std::{future::Future, mem::MaybeUninit};
|
||||||
|
|
||||||
pub trait BufferResultExt<B, T> {
|
pub trait Transpose<B, T> {
|
||||||
fn buf_ok(self) -> IoResult<(B, T)>;
|
fn transpose(self) -> IoResult<(B, T)>;
|
||||||
}
|
}
|
||||||
impl<B, T> BufferResultExt<B, T> for (B, IoResult<T>) {
|
impl<B, T> Transpose<B, T> for (B, IoResult<T>) {
|
||||||
fn buf_ok(self) -> IoResult<(B, T)> {
|
fn transpose(self) -> IoResult<(B, T)> {
|
||||||
let (buf, res) = self;
|
let (buf, res) = self;
|
||||||
|
|
||||||
res.map(|v| (buf, v))
|
res.map(|v| (buf, v))
|
||||||
|
|
|
||||||
|
|
@ -600,7 +600,7 @@ mod test {
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
aliases::IoResult,
|
aliases::IoResult,
|
||||||
io::{AsyncReadLoan, AsyncWriteLoan, BufferResultExt},
|
io::{AsyncReadLoan, AsyncWriteLoan, Transpose},
|
||||||
io_impl::io_uring::IoUring,
|
io_impl::io_uring::IoUring,
|
||||||
};
|
};
|
||||||
use futures_lite::StreamExt;
|
use futures_lite::StreamExt;
|
||||||
|
|
@ -632,7 +632,7 @@ mod test {
|
||||||
let (addr, read_data) = start_echo(uring).await?;
|
let (addr, read_data) = start_echo(uring).await?;
|
||||||
let mut conn = crate::net::TcpStream::connect(uring, addr).await?;
|
let mut conn = crate::net::TcpStream::connect(uring, addr).await?;
|
||||||
|
|
||||||
conn.write(input).await.buf_ok()?;
|
conn.write(input).await.transpose()?;
|
||||||
|
|
||||||
let data = read_data.await?;
|
let data = read_data.await?;
|
||||||
|
|
||||||
|
|
@ -655,11 +655,11 @@ mod test {
|
||||||
|
|
||||||
let write_data = async {
|
let write_data = async {
|
||||||
let mut stream = listener.incoming().await?.next().await.unwrap()?;
|
let mut stream = listener.incoming().await?.next().await.unwrap()?;
|
||||||
stream.write("Hello".as_bytes()).await.buf_ok()?;
|
stream.write("Hello".as_bytes()).await.transpose()?;
|
||||||
|
|
||||||
crate::time::sleep(uring, Duration::from_millis(500)).await?;
|
crate::time::sleep(uring, Duration::from_millis(500)).await?;
|
||||||
|
|
||||||
stream.write(", world".as_bytes()).await.buf_ok()?;
|
stream.write(", world".as_bytes()).await.transpose()?;
|
||||||
|
|
||||||
IoResult::Ok(())
|
IoResult::Ok(())
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue