From 21e8b9c8bbba563d8333d1dd415578feb982eaf2 Mon Sep 17 00:00:00 2001 From: soup Date: Thu, 24 Oct 2024 15:26:53 -0400 Subject: [PATCH] Work work --- Cargo.lock | 86 +++++++- crates/fetchplz/Cargo.toml | 4 + crates/fetchplz/src/lib.rs | 392 +++++++++++++++++++++++-------------- flake.nix | 12 +- src/lib.rs | 24 ++- src/parts.rs | 17 +- 6 files changed, 369 insertions(+), 166 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f29aa4b..d2d725b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,12 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 + +[[package]] +name = "bitflags" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "bytes" @@ -8,13 +14,27 @@ version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "fastrand" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" + [[package]] name = "fetchplz" version = "0.0.0" dependencies = [ + "futures-lite", "http", "httplz", "httplzx", + "wove", ] [[package]] @@ -23,6 +43,31 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-lite" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "http" version = "1.1.0" @@ -49,8 +94,47 @@ dependencies = [ "httplz", ] +[[package]] +name = "io-uring" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c9c844e08c94e8558389fb9b8944cb99fc697e231c975e4274b42bc99e0625b" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "itoa" version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "libc" +version = "0.2.161" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" + +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + +[[package]] +name = "wove" +version = "0.0.0" +dependencies = [ + "futures-lite", + "io-uring", + "libc", + "parking", +] diff --git a/crates/fetchplz/Cargo.toml b/crates/fetchplz/Cargo.toml index 6e6369f..f654d28 100644 --- a/crates/fetchplz/Cargo.toml +++ b/crates/fetchplz/Cargo.toml @@ -6,4 +6,8 @@ edition = "2021" [dependencies] httplz = { workspace = true } httplzx = { workspace = true } + http = { version = "1.1.0" } +futures-lite = { version = "2.3.0" } + +wove = { path = "../../../wove" } diff --git a/crates/fetchplz/src/lib.rs b/crates/fetchplz/src/lib.rs index 87122e3..6397975 100644 --- a/crates/fetchplz/src/lib.rs +++ b/crates/fetchplz/src/lib.rs @@ -1,181 +1,279 @@ -use std::io::{Error as IOError, Read, Write}; -use std::net::TcpStream; -use std::str::FromStr; +use std::future::Future; +use std::io::Error as IoError; -use httplz::NeedsMoreData; -use httplzx::ToEvents; +use futures_lite::{Stream, StreamExt}; +use http::uri::Scheme; -pub struct Body { - buf: httplz::Buf, u8>, - chunk_end: usize, - read_amt: usize, - stream: TcpStream, - conn: httplz::Connection, -} -impl Body { - fn find_next_chunk_sz(&mut self) -> Result<(), IOError> { - self.buf.pop_front(self.chunk_end); - self.chunk_end = 0; - self.read_amt = 0; +use http::{HeaderValue, Response}; +use wove::io::{AsyncReadLoan, AsyncWriteLoan, Transpose}; +use wove::io_impl::IoImpl; +use wove::util::Ignore; - while self.conn.is_recving() { - dbg!(self.conn, String::from_utf8_lossy(self.buf.filled())); - let res = self.conn.poll_recv(self.buf.filled()); - if res.needs_more_data() { - self.buf.read_from(&mut self.stream)?; - continue; - } - - let (amt, ev) = res.map_err(|e| IOError::other(format!("{e:?}")))?; - match ev { - httplz::Event::BodyChunk(b) => { - self.chunk_end = b.len(); - return Ok(()); - }, - _ => self.buf.pop_front(amt), - }; - } - - Ok(()) - } -} -impl Read for Body { - fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { - if self.chunk_end == self.read_amt { - self.find_next_chunk_sz()?; - } - - let chunk = &self.buf.filled()[self.read_amt..self.chunk_end]; - let amt = buf.write(chunk)?; - self.read_amt += amt; - - Ok(amt) - } -} -impl Body { - pub fn collect_bytes(self) -> Result, IOError> { - self.bytes().collect() - } -} - -pub type Response = http::Response; #[derive(Debug)] pub enum Error { - IOError(IOError), - HttplzError(httplz::Error), - InvalidUri, + InvalidUriScheme, InvalidStatusCode, InvalidHeaderValue, + InvalidHeaderName, + IoError(IoError), + Httplz(httplz::Error), } -impl From for Error { - fn from(value: IOError) -> Self { - Self::IOError(value) +impl From for Error { + fn from(value: IoError) -> Self { + Self::IoError(value) } } + impl From for Error { fn from(value: httplz::Error) -> Self { - Self::HttplzError(value) + Self::Httplz(value) } } -pub fn fetch(path: &str) -> Result { - let uri = http::Uri::from_str(path).map_err(|_| Error::InvalidUri)?; - let scheme = uri.scheme().cloned().unwrap_or(http::uri::Scheme::HTTPS); - let host = uri.host().ok_or(Error::InvalidUri)?; - let mut stream = TcpStream::connect(format!( - "{}:{}", - host, - uri.port_u16().unwrap_or_else(|| match scheme.as_str() { - "http" => 80, - _ => 443, - }) - ))?; - let mut conn = httplz::Connection::new(httplz::Role::Client); - - let (mut parts, _) = http::Request::new(()).into_parts(); - parts - .headers - .insert("Host", http::HeaderValue::from_str(host).unwrap()); - parts.uri = uri; - - let mut events = parts.to_events(); - events.extend_from_slice(&[httplz::Event::HeadersDone, httplz::Event::SendDone]); - - dbg!("a"); - let mut buf = httplz::Buf::new(vec![0; 4096].into_boxed_slice()); - for event in events { - conn.handle_send(&event, &mut buf)?; +impl From for Error { + fn from(_: http::status::InvalidStatusCode) -> Self { + Self::InvalidStatusCode } +} - stream.write_all(buf.filled())?; +impl From for Error { + fn from(_: http::header::InvalidHeaderValue) -> Self { + Self::InvalidHeaderValue + } +} - assert!(conn.is_recving()); +impl From for Error { + fn from(_: http::header::InvalidHeaderName) -> Self { + Self::InvalidHeaderName + } +} - buf.clear(); - let (mut parts, _) = http::Response::new(()).into_parts(); +pub type FetchResult = Result; - while conn.is_recving() { - let data = buf.filled(); - let res = conn.poll_recv(data); - if res.needs_more_data() { - buf.read_from(&mut stream)?; - continue; +pub struct Body { + pub size_hint: Option, + pub stream: S, +} + +pub type FetchResponse = http::Response>; +pub type FetchRequest = http::Request>; + +pub trait RequestExt { + fn send( + self, + io: &I, + ) -> impl Future< + Output = FetchResult, IoError>>>>, + >; +} + +impl RequestExt for http::Request> +where + S: Stream>, + I: IoImpl, + I::TcpStream: AsyncReadLoan + AsyncWriteLoan, +{ + async fn send( + self, + io: &I, + ) -> FetchResult, IoError>>>> { + let uri = self.uri(); + let host = uri.host().unwrap_or("localhost").to_string(); + let port = self + .uri() + .port() + .map(|p| Ok(p.as_u16())) + .or_else(|| match uri.scheme() { + Some(s) if *s == Scheme::HTTP => Some(Ok(80)), + Some(s) if *s == Scheme::HTTPS => Some(Ok(443)), + _ => Some(Err(Error::InvalidUriScheme)), + }) + .unwrap_or(Err(Error::InvalidUriScheme))?; + + let mut stream = wove::net::TcpStream::connect(io, (host.as_str(), port)).await?; + + let uri = uri.to_string(); + let (parts, body) = self.into_parts(); + let mut events: Vec = vec![ + httplz::RequestLine { + method: httplz::Method::new_from_str(parts.method.as_str()), + target: parts + .uri + .path_and_query() + .map(|p| p.as_str()) + .unwrap_or("/"), + version: httplz::Version::HTTP1_1, + } + .into(), + httplz::Header { + name: "Location", + value: uri.as_bytes(), + } + .into(), + httplz::Header { + name: "Host", + value: host.as_bytes(), + } + .into(), + ]; + + for header in parts.headers.iter() { + events.push( + httplz::Header { + name: header.0.as_str(), + value: header.1.as_bytes(), + } + .into(), + ) } - let (amt, event) = res?; - let mut brk = false; - match event { - httplz::Event::StatusLine(sl) => { - parts.status = http::StatusCode::from_u16(sl.status_code) - .map_err(|_| Error::InvalidStatusCode)?; - parts.version = http::Version::HTTP_11; - }, - httplz::Event::Header(h) => { - parts.headers.insert( - http::HeaderName::from_str(h.name).unwrap(), - http::HeaderValue::from_bytes(h.value) - .map_err(|_| Error::InvalidHeaderValue)?, - ); - }, - httplz::Event::HeadersDone => brk = true, - _ => (), - }; - dbg!(amt, brk); - buf.pop_front(amt); + let mut conn = httplz::Connection::new(httplz::Role::Client); - if brk { - break; + let buf = vec![0; 4096]; + let mut buf = httplz::Buf::new(buf); + for event in events.iter() { + loop { + match conn.handle_send(event, &mut buf) { + Ok(_) => break, + Err(httplz::Error { + kind: httplz::ErrorKind::BufNotBigEnough, + .. + }) => { + stream.write(Vec::from(buf.filled())).await.transpose()?; + buf.clear(); + }, + Err(e) => { + return Err(Error::Httplz(e)); + }, + } + } } - } + eprintln!("{}", String::from_utf8_lossy(buf.filled())); + stream.write(Vec::from(buf.filled())).await.transpose()?; + buf.clear(); - Ok(http::Response::from_parts( - parts, - Body { - buf, - stream, - conn, - chunk_end: 0, - read_amt: 0, - }, - )) + match body.size_hint { + Some(len) => conn.handle_send( + &httplz::Header { + name: "Content-Length", + value: format!("{len}").as_bytes(), + } + .into(), + &mut buf, + )?, + None => todo!(), + } + conn.handle_send(&httplz::Event::HeadersDone, &mut buf)?; + stream.write(Vec::from(buf.filled())).await.transpose()?; + + let (mut parts, _) = Response::new(()).into_parts(); + + stream + .read_stream() + .map(|r| r.map_err(Error::IoError)) + .scan(Vec::new(), |leftovers, r| { + r.and_then(|data| { + leftovers.extend_from_slice(&data); + let mut read_total = 0; + for res in conn.poll_recv_iter(leftovers) { + let (read, event) = res?; + read_total += read; + dbg!(event); + + match event { + httplz::Event::StatusLine(sl) => { + parts.status = http::StatusCode::from_u16(sl.status_code)?; + parts.version = match sl.version { + httplz::Version::HTTP1_1 => http::Version::HTTP_11, + } + }, + httplz::Event::Header(httplz::Header { name, value }) => { + parts.headers.insert( + name.parse::()?, + HeaderValue::from_bytes(value)?, + ); + }, + httplz::Event::HeadersDone => return Ok(None), + _ => (), + } + } + + leftovers.drain(0..read_total); + + FetchResult::Ok(Some(())) + }) + .transpose() + }) + .fuse() + .collect::() + .await; + + Ok(Response::from_parts( + parts, + Body { + size_hint: None, + stream: stream + .into_read_stream() + .map(|r| r.map(|v| v.into_boxed_slice())), + }, + )) + } } #[cfg(test)] -mod test_fetch { - use crate::fetch; +mod test { + use futures_lite::StreamExt; + use http::Request; + use wove::{ + io::{AsyncReadLoan, AsyncWriteLoan, Transpose}, + io_impl::io_uring::IoUring, + streams::StreamExt as _, + }; + + use crate::{Body, RequestExt}; #[test] - fn test_fetch_1() { - let resp = fetch("http://httpbin.org/get") - .unwrap() - .map(|b| b.collect_bytes().unwrap()); - let body = resp.into_body(); + fn test() { + let data = "Hello, world"; + let request = Request::builder() + .method(http::method::Method::POST) + .uri("http://google.com") + .body(Body { + size_hint: Some(data.len()), + stream: futures_lite::stream::once( + data.to_string().into_bytes().into_boxed_slice(), + ), + }) + .unwrap(); - let body = String::from_utf8_lossy(&body); + let uring = &IoUring::new().unwrap(); + uring.block_on(async { + let response = request.send(uring).await.unwrap(); + let (parts, body) = response.into_parts(); + dbg!(parts); - assert_eq!( - body, - include_str!("../testing/snapshots/httpbin_get_empty.txt") - ) + let body: Vec = body.stream.try_collect_chunks().await.unwrap(); + dbg!(String::from_utf8_lossy(&body)); + let mut test = wove::net::TcpStream::connect(uring, "google.com:80") + .await + .unwrap(); + + test.write("GET / HTTP/1.1\r\n\r\n".to_string()) + .await + .transpose() + .unwrap(); + + let body: Vec = test + .read_stream() + .inspect(|v| { + dbg!(v.as_ref().map(|v| String::from_utf8_lossy(v.as_ref()))); + }) + .fuse() + .try_collect_chunks() + .await + .unwrap(); + + dbg!(String::from_utf8_lossy(&body)); + }); } } diff --git a/flake.nix b/flake.nix index 61d666b..bdae61f 100644 --- a/flake.nix +++ b/flake.nix @@ -21,7 +21,8 @@ pkgs = i.nixpkgs.legacyPackages.${system}; pkgsDeno = i.deno-flake.packages.${system}; pkgsFenix = i.fenix.packages.${system}; - nightly = pkgsFenix.default; + minimal = pkgsFenix.minimal; + complete = pkgsFenix.complete; stable = pkgsFenix.stable; in { devShells.default = pkgs.mkShell { @@ -34,11 +35,10 @@ pkgs.fd (pkgsFenix.combine [ - (stable.withComponents [ - "cargo" "rustc" "rust-src" "rust-analyzer" "clippy" - ]) - nightly.rustfmt - ]) + (complete.withComponents [ "rust-src" "rust-analyzer" "rustfmt" "clippy" ]) + (minimal.withComponents [ "cargo" "rustc" "rust-std" ]) + ]) + pkgs.cargo-watch ]; }; diff --git a/src/lib.rs b/src/lib.rs index 306958a..f9d2ceb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,7 +43,7 @@ impl<'a> From> for Event<'a> { } } -impl<'a> core::fmt::Display for Event<'a> { +impl core::fmt::Display for Event<'_> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { Event::Empty | Event::HeadersDone | Event::RecvDone | Event::SendDone => Ok(()), @@ -221,6 +221,10 @@ impl Connection { }) } + pub fn poll_recv_iter<'a>(&'a mut self, bytes: &'a [u8]) -> PollRecvIter<'a> { + PollRecvIter(self, bytes) + } + pub fn handle_send(&mut self, event: &Event, mut w: impl Write) -> Written { let state = match self.state { StateConnection::Send(s) => s, @@ -295,3 +299,21 @@ impl Connection { Ok(()) } } + +pub struct PollRecvIter<'a>(&'a mut Connection, &'a [u8]); +impl<'a> Iterator for PollRecvIter<'a> { + type Item = Parse<'a, Event<'a>>; + + fn next(&mut self) -> Option { + let res = self.0.poll_recv(self.1); + if let Err(Error { + kind: ErrorKind::NeedMoreData, + .. + }) = res + { + return None; + } + + Some(res) + } +} diff --git a/src/parts.rs b/src/parts.rs index 9bf36dc..26908e0 100644 --- a/src/parts.rs +++ b/src/parts.rs @@ -40,7 +40,7 @@ impl<'a> Method<'a> { } } -impl<'a> core::fmt::Display for Method<'a> { +impl core::fmt::Display for Method<'_> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { use Method::*; @@ -80,7 +80,7 @@ pub struct Header<'a> { pub name: &'a str, pub value: &'a [u8], } -impl<'a> Header<'a> { +impl Header<'_> { pub fn special(&self) -> Option { let Self { name, value } = self; @@ -90,15 +90,10 @@ impl<'a> Header<'a> { { Some(HeaderSpecial::TransferEncodingChunked) }, - _ if name.eq_ignore_ascii_case("content-length") => { - match core::str::from_utf8(value) - .ok() - .and_then(|s| s.parse().ok()) - { - Some(v) => Some(HeaderSpecial::ContentLength(v)), - _ => None, - } - }, + _ if name.eq_ignore_ascii_case("content-length") => core::str::from_utf8(value) + .ok() + .and_then(|s| s.parse().ok()) + .map(HeaderSpecial::ContentLength), _ => None, } }