httplz/crates/fetchplz/src/lib.rs
2024-10-18 00:29:46 -04:00

182 lines
3.9 KiB
Rust

use std::io::{Error as IOError, Read, Write};
use std::net::TcpStream;
use std::str::FromStr;
use httplz::NeedsMoreData;
use httplzx::ToEvents;
pub struct Body {
buf: httplz::Buf<Box<[u8]>, u8>,
chunk_end: usize,
read_amt: usize,
stream: TcpStream,
conn: httplz::Connection,
}
impl Body {
fn find_next_chunk_sz(&mut self) -> Result<(), IOError> {
self.buf.pop_front(self.chunk_end);
self.chunk_end = 0;
self.read_amt = 0;
while self.conn.is_recving() {
dbg!(self.conn, String::from_utf8_lossy(self.buf.filled()));
let res = self.conn.poll_recv(self.buf.filled());
if res.needs_more_data() {
self.buf.read_from(&mut self.stream)?;
continue;
}
let (amt, ev) = res.map_err(|e| IOError::other(format!("{e:?}")))?;
match ev {
httplz::Event::BodyChunk(b) => {
self.chunk_end = b.len();
return Ok(());
},
_ => self.buf.pop_front(amt),
};
}
Ok(())
}
}
impl Read for Body {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.chunk_end == self.read_amt {
self.find_next_chunk_sz()?;
}
let chunk = &self.buf.filled()[self.read_amt..self.chunk_end];
let amt = buf.write(chunk)?;
self.read_amt += amt;
Ok(amt)
}
}
impl Body {
pub fn collect_bytes(self) -> Result<Vec<u8>, IOError> {
self.bytes().collect()
}
}
pub type Response = http::Response<Body>;
#[derive(Debug)]
pub enum Error {
IOError(IOError),
HttplzError(httplz::Error),
InvalidUri,
InvalidStatusCode,
InvalidHeaderValue,
}
impl From<IOError> for Error {
fn from(value: IOError) -> Self {
Self::IOError(value)
}
}
impl From<httplz::Error> for Error {
fn from(value: httplz::Error) -> Self {
Self::HttplzError(value)
}
}
pub fn fetch(path: &str) -> Result<Response, Error> {
let uri = http::Uri::from_str(path).map_err(|_| Error::InvalidUri)?;
let scheme = uri.scheme().cloned().unwrap_or(http::uri::Scheme::HTTPS);
let host = uri.host().ok_or(Error::InvalidUri)?;
let mut stream = TcpStream::connect(format!(
"{}:{}",
host,
uri.port_u16().unwrap_or_else(|| match scheme.as_str() {
"http" => 80,
_ => 443,
})
))?;
let mut conn = httplz::Connection::new(httplz::Role::Client);
let (mut parts, _) = http::Request::new(()).into_parts();
parts
.headers
.insert("Host", http::HeaderValue::from_str(host).unwrap());
parts.uri = uri;
let mut events = parts.to_events();
events.extend_from_slice(&[httplz::Event::HeadersDone, httplz::Event::SendDone]);
dbg!("a");
let mut buf = httplz::Buf::new(vec![0; 4096].into_boxed_slice());
for event in events {
conn.handle_send(&event, &mut buf)?;
}
stream.write_all(buf.filled())?;
assert!(conn.is_recving());
buf.clear();
let (mut parts, _) = http::Response::new(()).into_parts();
while conn.is_recving() {
let data = buf.filled();
let res = conn.poll_recv(data);
if res.needs_more_data() {
buf.read_from(&mut stream)?;
continue;
}
let (amt, event) = res?;
let mut brk = false;
match event {
httplz::Event::StatusLine(sl) => {
parts.status = http::StatusCode::from_u16(sl.status_code)
.map_err(|_| Error::InvalidStatusCode)?;
parts.version = http::Version::HTTP_11;
},
httplz::Event::Header(h) => {
parts.headers.insert(
http::HeaderName::from_str(h.name).unwrap(),
http::HeaderValue::from_bytes(h.value)
.map_err(|_| Error::InvalidHeaderValue)?,
);
},
httplz::Event::HeadersDone => brk = true,
_ => (),
};
dbg!(amt, brk);
buf.pop_front(amt);
if brk {
break;
}
}
Ok(http::Response::from_parts(
parts,
Body {
buf,
stream,
conn,
chunk_end: 0,
read_amt: 0,
},
))
}
#[cfg(test)]
mod test_fetch {
use crate::fetch;
#[test]
fn test_fetch_1() {
let resp = fetch("http://httpbin.org/get")
.unwrap()
.map(|b| b.collect_bytes().unwrap());
let body = resp.into_body();
let body = String::from_utf8_lossy(&body);
assert_eq!(
body,
include_str!("../testing/snapshots/httpbin_get_empty.txt")
)
}
}