Refactor AsyncReadLoan

This commit is contained in:
soup 2024-10-23 00:39:21 -04:00
parent 520f3fd949
commit 5afba95f02
No known key found for this signature in database
3 changed files with 57 additions and 12 deletions

View file

@ -1,11 +1,12 @@
use futures_lite::Stream; use futures_lite::Stream;
use crate::aliases::IoResult; use crate::aliases::IoResult;
use std::future::Future; use std::{future::Future, mem::MaybeUninit};
pub trait BufferMut { pub trait BufferMut {
fn as_mut_ptr(&mut self) -> *mut u8; fn as_mut_ptr(&mut self) -> *mut u8;
fn writable_bytes(&self) -> usize; fn writable_bytes(&self) -> usize;
fn resize_to_fit(&mut self, written_bytes: usize);
} }
impl BufferMut for Vec<u8> { impl BufferMut for Vec<u8> {
@ -14,7 +15,29 @@ impl BufferMut for Vec<u8> {
} }
fn writable_bytes(&self) -> usize { fn writable_bytes(&self) -> usize {
self.len() self.capacity()
}
fn resize_to_fit(&mut self, written_bytes: usize) {
if written_bytes > self.capacity() {
panic!();
}
unsafe { self.set_len(written_bytes) };
}
}
fn resize_box<T>(bx: &mut Box<[T]>, size: usize) {
let mut bx_tmp = MaybeUninit::uninit();
let bx_tmp_ptr: *mut Box<[T]> = bx_tmp.as_mut_ptr();
let bx_tmp = unsafe {
bx_tmp_ptr.copy_from_nonoverlapping(bx as *const _, 1);
bx_tmp.assume_init()
};
let mut vec = bx_tmp.into_vec();
vec.truncate(size);
unsafe {
(bx as *mut Box<[T]>).copy_from_nonoverlapping(&vec.into_boxed_slice(), 1);
} }
} }
@ -26,21 +49,43 @@ impl BufferMut for Box<[u8]> {
fn writable_bytes(&self) -> usize { fn writable_bytes(&self) -> usize {
self.len() self.len()
} }
fn resize_to_fit(&mut self, written_bytes: usize) {
resize_box(self, written_bytes)
}
} }
pub trait AsyncReadLoan { pub trait AsyncReadLoan {
fn read<B: BufferMut>(&mut self, buf: B) -> impl Future<Output = (B, IoResult<usize>)>; fn read_no_resize<B: BufferMut>(
&mut self,
buf: B,
) -> impl Future<Output = (B, IoResult<usize>)>;
fn read<B: BufferMut>(&mut self, buf: B) -> impl Future<Output = (B, IoResult<usize>)> {
async {
let (mut buf, amt) = self.read_no_resize(buf).await;
let (buf, amt) = match amt {
Ok(amt) => (
{
buf.resize_to_fit(amt);
buf
},
Ok(amt),
),
Err(e) => (buf, Err(e)),
};
(buf, amt)
}
}
fn read_stream(&mut self) -> impl Stream<Item = IoResult<Vec<u8>>> { fn read_stream(&mut self) -> impl Stream<Item = IoResult<Vec<u8>>> {
futures_lite::stream::unfold(self, move |s| async move { futures_lite::stream::unfold(self, move |s| async move {
let (mut buf, res) = s.read(vec![0; 4096]).await; let (buf, res) = s.read(vec![0; 4096]).await;
match res { match res {
Err(e) => Some((Err(e), s)), Err(e) => Some((Err(e), s)),
Ok(0) => None, Ok(0) => None,
Ok(read_amt) => { Ok(_) => Some((Ok(buf), s)),
buf.truncate(read_amt);
Some((Ok(buf), s))
},
} }
}) })
} }

View file

@ -16,7 +16,7 @@ use parking::Parker;
use crate::{ use crate::{
aliases::IoResult, aliases::IoResult,
futures::{handoff, Get, GetFut, Put}, futures::{handoff, Get, GetFut, Put},
io::{AsyncReadLoan, AsyncWriteLoan}, io::{AsyncReadLoan, AsyncWriteLoan, Buffer, BufferMut},
}; };
use super::IoImpl; use super::IoImpl;
@ -281,7 +281,7 @@ impl Drop for TcpStream {
} }
} }
impl AsyncReadLoan for TcpStream { impl AsyncReadLoan for TcpStream {
async fn read<B: crate::io::BufferMut>(&mut self, mut buf: B) -> (B, IoResult<usize>) { async fn read_no_resize<B: BufferMut>(&mut self, mut buf: B) -> (B, IoResult<usize>) {
let res = self let res = self
.uring .uring
.0 .0
@ -301,7 +301,7 @@ impl AsyncReadLoan for TcpStream {
} }
impl AsyncWriteLoan for TcpStream { impl AsyncWriteLoan for TcpStream {
async fn write<B: crate::io::Buffer>(&mut self, buf: B) -> (B, IoResult<usize>) { async fn write<B: Buffer>(&mut self, buf: B) -> (B, IoResult<usize>) {
let res = self let res = self
.uring .uring
.0 .0

View file

@ -22,7 +22,7 @@ impl<I: IoImpl> AsyncReadLoan for TcpStream<I>
where where
I::TcpStream: AsyncReadLoan, I::TcpStream: AsyncReadLoan,
{ {
async fn read<B: BufferMut>(&mut self, buf: B) -> (B, IoResult<usize>) { async fn read_no_resize<B: BufferMut>(&mut self, buf: B) -> (B, IoResult<usize>) {
self.0.read(buf).await self.0.read(buf).await
} }
} }