diff --git a/src/io.rs b/src/io.rs index 11254d9..c85a884 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,11 +1,12 @@ use futures_lite::Stream; use crate::aliases::IoResult; -use std::future::Future; +use std::{future::Future, mem::MaybeUninit}; pub trait BufferMut { fn as_mut_ptr(&mut self) -> *mut u8; fn writable_bytes(&self) -> usize; + fn resize_to_fit(&mut self, written_bytes: usize); } impl BufferMut for Vec { @@ -14,7 +15,29 @@ impl BufferMut for Vec { } fn writable_bytes(&self) -> usize { - self.len() + self.capacity() + } + + fn resize_to_fit(&mut self, written_bytes: usize) { + if written_bytes > self.capacity() { + panic!(); + } + unsafe { self.set_len(written_bytes) }; + } +} + +fn resize_box(bx: &mut Box<[T]>, size: usize) { + let mut bx_tmp = MaybeUninit::uninit(); + let bx_tmp_ptr: *mut Box<[T]> = bx_tmp.as_mut_ptr(); + let bx_tmp = unsafe { + bx_tmp_ptr.copy_from_nonoverlapping(bx as *const _, 1); + bx_tmp.assume_init() + }; + + let mut vec = bx_tmp.into_vec(); + vec.truncate(size); + unsafe { + (bx as *mut Box<[T]>).copy_from_nonoverlapping(&vec.into_boxed_slice(), 1); } } @@ -26,21 +49,43 @@ impl BufferMut for Box<[u8]> { fn writable_bytes(&self) -> usize { self.len() } + + fn resize_to_fit(&mut self, written_bytes: usize) { + resize_box(self, written_bytes) + } } pub trait AsyncReadLoan { - fn read(&mut self, buf: B) -> impl Future)>; + fn read_no_resize( + &mut self, + buf: B, + ) -> impl Future)>; + + fn read(&mut self, buf: B) -> impl Future)> { + async { + let (mut buf, amt) = self.read_no_resize(buf).await; + let (buf, amt) = match amt { + Ok(amt) => ( + { + buf.resize_to_fit(amt); + buf + }, + Ok(amt), + ), + Err(e) => (buf, Err(e)), + }; + + (buf, amt) + } + } fn read_stream(&mut self) -> impl Stream>> { futures_lite::stream::unfold(self, move |s| async move { - let (mut buf, res) = s.read(vec![0; 4096]).await; + let (buf, res) = s.read(vec![0; 4096]).await; match res { Err(e) => Some((Err(e), s)), Ok(0) => None, - Ok(read_amt) => { - buf.truncate(read_amt); - Some((Ok(buf), s)) - }, + Ok(_) => Some((Ok(buf), s)), } }) } diff --git a/src/io_impl/io_uring/mod.rs b/src/io_impl/io_uring/mod.rs index 061ec68..78310b8 100644 --- a/src/io_impl/io_uring/mod.rs +++ b/src/io_impl/io_uring/mod.rs @@ -16,7 +16,7 @@ use parking::Parker; use crate::{ aliases::IoResult, futures::{handoff, Get, GetFut, Put}, - io::{AsyncReadLoan, AsyncWriteLoan}, + io::{AsyncReadLoan, AsyncWriteLoan, Buffer, BufferMut}, }; use super::IoImpl; @@ -281,7 +281,7 @@ impl Drop for TcpStream { } } impl AsyncReadLoan for TcpStream { - async fn read(&mut self, mut buf: B) -> (B, IoResult) { + async fn read_no_resize(&mut self, mut buf: B) -> (B, IoResult) { let res = self .uring .0 @@ -301,7 +301,7 @@ impl AsyncReadLoan for TcpStream { } impl AsyncWriteLoan for TcpStream { - async fn write(&mut self, buf: B) -> (B, IoResult) { + async fn write(&mut self, buf: B) -> (B, IoResult) { let res = self .uring .0 diff --git a/src/net.rs b/src/net.rs index 11fed13..d8daba5 100644 --- a/src/net.rs +++ b/src/net.rs @@ -22,7 +22,7 @@ impl AsyncReadLoan for TcpStream where I::TcpStream: AsyncReadLoan, { - async fn read(&mut self, buf: B) -> (B, IoResult) { + async fn read_no_resize(&mut self, buf: B) -> (B, IoResult) { self.0.read(buf).await } }