Work work

This commit is contained in:
soup 2024-10-24 15:26:53 -04:00
parent da94c17fe8
commit 21e8b9c8bb
No known key found for this signature in database
6 changed files with 369 additions and 166 deletions

86
Cargo.lock generated
View file

@ -1,6 +1,12 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "bitflags"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "bytes"
@ -8,13 +14,27 @@ version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "fastrand"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "fetchplz"
version = "0.0.0"
dependencies = [
"futures-lite",
"http",
"httplz",
"httplzx",
"wove",
]
[[package]]
@ -23,6 +43,31 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-lite"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"parking",
"pin-project-lite",
]
[[package]]
name = "http"
version = "1.1.0"
@ -49,8 +94,47 @@ dependencies = [
"httplz",
]
[[package]]
name = "io-uring"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c9c844e08c94e8558389fb9b8944cb99fc697e231c975e4274b42bc99e0625b"
dependencies = [
"bitflags",
"cfg-if",
"libc",
]
[[package]]
name = "itoa"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "libc"
version = "0.2.161"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1"
[[package]]
name = "parking"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "pin-project-lite"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "wove"
version = "0.0.0"
dependencies = [
"futures-lite",
"io-uring",
"libc",
"parking",
]

View file

@ -6,4 +6,8 @@ edition = "2021"
[dependencies]
httplz = { workspace = true }
httplzx = { workspace = true }
http = { version = "1.1.0" }
futures-lite = { version = "2.3.0" }
wove = { path = "../../../wove" }

View file

@ -1,181 +1,279 @@
use std::io::{Error as IOError, Read, Write};
use std::net::TcpStream;
use std::str::FromStr;
use std::future::Future;
use std::io::Error as IoError;
use httplz::NeedsMoreData;
use httplzx::ToEvents;
use futures_lite::{Stream, StreamExt};
use http::uri::Scheme;
pub struct Body {
buf: httplz::Buf<Box<[u8]>, u8>,
chunk_end: usize,
read_amt: usize,
stream: TcpStream,
conn: httplz::Connection,
}
impl Body {
fn find_next_chunk_sz(&mut self) -> Result<(), IOError> {
self.buf.pop_front(self.chunk_end);
self.chunk_end = 0;
self.read_amt = 0;
use http::{HeaderValue, Response};
use wove::io::{AsyncReadLoan, AsyncWriteLoan, Transpose};
use wove::io_impl::IoImpl;
use wove::util::Ignore;
while self.conn.is_recving() {
dbg!(self.conn, String::from_utf8_lossy(self.buf.filled()));
let res = self.conn.poll_recv(self.buf.filled());
if res.needs_more_data() {
self.buf.read_from(&mut self.stream)?;
continue;
}
let (amt, ev) = res.map_err(|e| IOError::other(format!("{e:?}")))?;
match ev {
httplz::Event::BodyChunk(b) => {
self.chunk_end = b.len();
return Ok(());
},
_ => self.buf.pop_front(amt),
};
}
Ok(())
}
}
impl Read for Body {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.chunk_end == self.read_amt {
self.find_next_chunk_sz()?;
}
let chunk = &self.buf.filled()[self.read_amt..self.chunk_end];
let amt = buf.write(chunk)?;
self.read_amt += amt;
Ok(amt)
}
}
impl Body {
pub fn collect_bytes(self) -> Result<Vec<u8>, IOError> {
self.bytes().collect()
}
}
pub type Response = http::Response<Body>;
#[derive(Debug)]
pub enum Error {
IOError(IOError),
HttplzError(httplz::Error),
InvalidUri,
InvalidUriScheme,
InvalidStatusCode,
InvalidHeaderValue,
InvalidHeaderName,
IoError(IoError),
Httplz(httplz::Error),
}
impl From<IOError> for Error {
fn from(value: IOError) -> Self {
Self::IOError(value)
impl From<IoError> for Error {
fn from(value: IoError) -> Self {
Self::IoError(value)
}
}
impl From<httplz::Error> for Error {
fn from(value: httplz::Error) -> Self {
Self::HttplzError(value)
Self::Httplz(value)
}
}
pub fn fetch(path: &str) -> Result<Response, Error> {
let uri = http::Uri::from_str(path).map_err(|_| Error::InvalidUri)?;
let scheme = uri.scheme().cloned().unwrap_or(http::uri::Scheme::HTTPS);
let host = uri.host().ok_or(Error::InvalidUri)?;
let mut stream = TcpStream::connect(format!(
"{}:{}",
host,
uri.port_u16().unwrap_or_else(|| match scheme.as_str() {
"http" => 80,
_ => 443,
})
))?;
let mut conn = httplz::Connection::new(httplz::Role::Client);
let (mut parts, _) = http::Request::new(()).into_parts();
parts
.headers
.insert("Host", http::HeaderValue::from_str(host).unwrap());
parts.uri = uri;
let mut events = parts.to_events();
events.extend_from_slice(&[httplz::Event::HeadersDone, httplz::Event::SendDone]);
dbg!("a");
let mut buf = httplz::Buf::new(vec![0; 4096].into_boxed_slice());
for event in events {
conn.handle_send(&event, &mut buf)?;
impl From<http::status::InvalidStatusCode> for Error {
fn from(_: http::status::InvalidStatusCode) -> Self {
Self::InvalidStatusCode
}
}
stream.write_all(buf.filled())?;
impl From<http::header::InvalidHeaderValue> for Error {
fn from(_: http::header::InvalidHeaderValue) -> Self {
Self::InvalidHeaderValue
}
}
assert!(conn.is_recving());
impl From<http::header::InvalidHeaderName> for Error {
fn from(_: http::header::InvalidHeaderName) -> Self {
Self::InvalidHeaderName
}
}
buf.clear();
let (mut parts, _) = http::Response::new(()).into_parts();
pub type FetchResult<T> = Result<T, Error>;
while conn.is_recving() {
let data = buf.filled();
let res = conn.poll_recv(data);
if res.needs_more_data() {
buf.read_from(&mut stream)?;
continue;
pub struct Body<S> {
pub size_hint: Option<usize>,
pub stream: S,
}
pub type FetchResponse<S> = http::Response<Body<S>>;
pub type FetchRequest<S> = http::Request<Body<S>>;
pub trait RequestExt<S, I> {
fn send(
self,
io: &I,
) -> impl Future<
Output = FetchResult<FetchResponse<impl Stream<Item = Result<Box<[u8]>, IoError>>>>,
>;
}
impl<S, I: IoImpl> RequestExt<S, I> for http::Request<Body<S>>
where
S: Stream<Item = Box<[u8]>>,
I: IoImpl,
I::TcpStream: AsyncReadLoan + AsyncWriteLoan,
{
async fn send(
self,
io: &I,
) -> FetchResult<FetchResponse<impl Stream<Item = Result<Box<[u8]>, IoError>>>> {
let uri = self.uri();
let host = uri.host().unwrap_or("localhost").to_string();
let port = self
.uri()
.port()
.map(|p| Ok(p.as_u16()))
.or_else(|| match uri.scheme() {
Some(s) if *s == Scheme::HTTP => Some(Ok(80)),
Some(s) if *s == Scheme::HTTPS => Some(Ok(443)),
_ => Some(Err(Error::InvalidUriScheme)),
})
.unwrap_or(Err(Error::InvalidUriScheme))?;
let mut stream = wove::net::TcpStream::connect(io, (host.as_str(), port)).await?;
let uri = uri.to_string();
let (parts, body) = self.into_parts();
let mut events: Vec<httplz::Event> = vec![
httplz::RequestLine {
method: httplz::Method::new_from_str(parts.method.as_str()),
target: parts
.uri
.path_and_query()
.map(|p| p.as_str())
.unwrap_or("/"),
version: httplz::Version::HTTP1_1,
}
.into(),
httplz::Header {
name: "Location",
value: uri.as_bytes(),
}
.into(),
httplz::Header {
name: "Host",
value: host.as_bytes(),
}
.into(),
];
for header in parts.headers.iter() {
events.push(
httplz::Header {
name: header.0.as_str(),
value: header.1.as_bytes(),
}
.into(),
)
}
let (amt, event) = res?;
let mut brk = false;
match event {
httplz::Event::StatusLine(sl) => {
parts.status = http::StatusCode::from_u16(sl.status_code)
.map_err(|_| Error::InvalidStatusCode)?;
parts.version = http::Version::HTTP_11;
},
httplz::Event::Header(h) => {
parts.headers.insert(
http::HeaderName::from_str(h.name).unwrap(),
http::HeaderValue::from_bytes(h.value)
.map_err(|_| Error::InvalidHeaderValue)?,
);
},
httplz::Event::HeadersDone => brk = true,
_ => (),
};
dbg!(amt, brk);
buf.pop_front(amt);
let mut conn = httplz::Connection::new(httplz::Role::Client);
if brk {
break;
let buf = vec![0; 4096];
let mut buf = httplz::Buf::new(buf);
for event in events.iter() {
loop {
match conn.handle_send(event, &mut buf) {
Ok(_) => break,
Err(httplz::Error {
kind: httplz::ErrorKind::BufNotBigEnough,
..
}) => {
stream.write(Vec::from(buf.filled())).await.transpose()?;
buf.clear();
},
Err(e) => {
return Err(Error::Httplz(e));
},
}
}
}
}
eprintln!("{}", String::from_utf8_lossy(buf.filled()));
stream.write(Vec::from(buf.filled())).await.transpose()?;
buf.clear();
Ok(http::Response::from_parts(
parts,
Body {
buf,
stream,
conn,
chunk_end: 0,
read_amt: 0,
},
))
match body.size_hint {
Some(len) => conn.handle_send(
&httplz::Header {
name: "Content-Length",
value: format!("{len}").as_bytes(),
}
.into(),
&mut buf,
)?,
None => todo!(),
}
conn.handle_send(&httplz::Event::HeadersDone, &mut buf)?;
stream.write(Vec::from(buf.filled())).await.transpose()?;
let (mut parts, _) = Response::new(()).into_parts();
stream
.read_stream()
.map(|r| r.map_err(Error::IoError))
.scan(Vec::new(), |leftovers, r| {
r.and_then(|data| {
leftovers.extend_from_slice(&data);
let mut read_total = 0;
for res in conn.poll_recv_iter(leftovers) {
let (read, event) = res?;
read_total += read;
dbg!(event);
match event {
httplz::Event::StatusLine(sl) => {
parts.status = http::StatusCode::from_u16(sl.status_code)?;
parts.version = match sl.version {
httplz::Version::HTTP1_1 => http::Version::HTTP_11,
}
},
httplz::Event::Header(httplz::Header { name, value }) => {
parts.headers.insert(
name.parse::<http::HeaderName>()?,
HeaderValue::from_bytes(value)?,
);
},
httplz::Event::HeadersDone => return Ok(None),
_ => (),
}
}
leftovers.drain(0..read_total);
FetchResult::Ok(Some(()))
})
.transpose()
})
.fuse()
.collect::<Ignore>()
.await;
Ok(Response::from_parts(
parts,
Body {
size_hint: None,
stream: stream
.into_read_stream()
.map(|r| r.map(|v| v.into_boxed_slice())),
},
))
}
}
#[cfg(test)]
mod test_fetch {
use crate::fetch;
mod test {
use futures_lite::StreamExt;
use http::Request;
use wove::{
io::{AsyncReadLoan, AsyncWriteLoan, Transpose},
io_impl::io_uring::IoUring,
streams::StreamExt as _,
};
use crate::{Body, RequestExt};
#[test]
fn test_fetch_1() {
let resp = fetch("http://httpbin.org/get")
.unwrap()
.map(|b| b.collect_bytes().unwrap());
let body = resp.into_body();
fn test() {
let data = "Hello, world";
let request = Request::builder()
.method(http::method::Method::POST)
.uri("http://google.com")
.body(Body {
size_hint: Some(data.len()),
stream: futures_lite::stream::once(
data.to_string().into_bytes().into_boxed_slice(),
),
})
.unwrap();
let body = String::from_utf8_lossy(&body);
let uring = &IoUring::new().unwrap();
uring.block_on(async {
let response = request.send(uring).await.unwrap();
let (parts, body) = response.into_parts();
dbg!(parts);
assert_eq!(
body,
include_str!("../testing/snapshots/httpbin_get_empty.txt")
)
let body: Vec<u8> = body.stream.try_collect_chunks().await.unwrap();
dbg!(String::from_utf8_lossy(&body));
let mut test = wove::net::TcpStream::connect(uring, "google.com:80")
.await
.unwrap();
test.write("GET / HTTP/1.1\r\n\r\n".to_string())
.await
.transpose()
.unwrap();
let body: Vec<u8> = test
.read_stream()
.inspect(|v| {
dbg!(v.as_ref().map(|v| String::from_utf8_lossy(v.as_ref())));
})
.fuse()
.try_collect_chunks()
.await
.unwrap();
dbg!(String::from_utf8_lossy(&body));
});
}
}

View file

@ -21,7 +21,8 @@
pkgs = i.nixpkgs.legacyPackages.${system};
pkgsDeno = i.deno-flake.packages.${system};
pkgsFenix = i.fenix.packages.${system};
nightly = pkgsFenix.default;
minimal = pkgsFenix.minimal;
complete = pkgsFenix.complete;
stable = pkgsFenix.stable;
in {
devShells.default = pkgs.mkShell {
@ -34,11 +35,10 @@
pkgs.fd
(pkgsFenix.combine [
(stable.withComponents [
"cargo" "rustc" "rust-src" "rust-analyzer" "clippy"
])
nightly.rustfmt
])
(complete.withComponents [ "rust-src" "rust-analyzer" "rustfmt" "clippy" ])
(minimal.withComponents [ "cargo" "rustc" "rust-std" ])
])
pkgs.cargo-watch
];
};

View file

@ -43,7 +43,7 @@ impl<'a> From<StatusLine<'a>> for Event<'a> {
}
}
impl<'a> core::fmt::Display for Event<'a> {
impl core::fmt::Display for Event<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Event::Empty | Event::HeadersDone | Event::RecvDone | Event::SendDone => Ok(()),
@ -221,6 +221,10 @@ impl Connection {
})
}
pub fn poll_recv_iter<'a>(&'a mut self, bytes: &'a [u8]) -> PollRecvIter<'a> {
PollRecvIter(self, bytes)
}
pub fn handle_send(&mut self, event: &Event, mut w: impl Write) -> Written {
let state = match self.state {
StateConnection::Send(s) => s,
@ -295,3 +299,21 @@ impl Connection {
Ok(())
}
}
pub struct PollRecvIter<'a>(&'a mut Connection, &'a [u8]);
impl<'a> Iterator for PollRecvIter<'a> {
type Item = Parse<'a, Event<'a>>;
fn next(&mut self) -> Option<Self::Item> {
let res = self.0.poll_recv(self.1);
if let Err(Error {
kind: ErrorKind::NeedMoreData,
..
}) = res
{
return None;
}
Some(res)
}
}

View file

@ -40,7 +40,7 @@ impl<'a> Method<'a> {
}
}
impl<'a> core::fmt::Display for Method<'a> {
impl core::fmt::Display for Method<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
use Method::*;
@ -80,7 +80,7 @@ pub struct Header<'a> {
pub name: &'a str,
pub value: &'a [u8],
}
impl<'a> Header<'a> {
impl Header<'_> {
pub fn special(&self) -> Option<HeaderSpecial> {
let Self { name, value } = self;
@ -90,15 +90,10 @@ impl<'a> Header<'a> {
{
Some(HeaderSpecial::TransferEncodingChunked)
},
_ if name.eq_ignore_ascii_case("content-length") => {
match core::str::from_utf8(value)
.ok()
.and_then(|s| s.parse().ok())
{
Some(v) => Some(HeaderSpecial::ContentLength(v)),
_ => None,
}
},
_ if name.eq_ignore_ascii_case("content-length") => core::str::from_utf8(value)
.ok()
.and_then(|s| s.parse().ok())
.map(HeaderSpecial::ContentLength),
_ => None,
}
}