Big library rework, but good shit is happening
This commit is contained in:
parent
0db4c4fa9c
commit
bdcf820ca7
8
Cargo.lock
generated
8
Cargo.lock
generated
|
|
@ -122,12 +122,6 @@ version = "0.2.14"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
|
||||
|
||||
[[package]]
|
||||
name = "pollster"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22686f4785f02a4fcc856d3b3bb19bf6c8160d103f7a99cc258bddd0251dc7f2"
|
||||
|
||||
[[package]]
|
||||
name = "wove"
|
||||
version = "0.0.0"
|
||||
|
|
@ -136,5 +130,5 @@ dependencies = [
|
|||
"futures-lite",
|
||||
"io-uring",
|
||||
"libc",
|
||||
"pollster",
|
||||
"parking",
|
||||
]
|
||||
|
|
|
|||
28
Cargo.toml
28
Cargo.toml
|
|
@ -4,15 +4,27 @@ version = "0.0.0"
|
|||
edition = "2021"
|
||||
description = "wove"
|
||||
license = "AGPL-3.0-or-later"
|
||||
resolver = "2"
|
||||
|
||||
[dependencies]
|
||||
|
||||
[dev-dependencies]
|
||||
pollster = { version = "0.3.0" }
|
||||
[workspace.dependencies]
|
||||
futures-lite = { version = "2.3.0" }
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
io-uring = { version = '0.7.1' }
|
||||
libc = { version = "0.2.161" }
|
||||
async-channel = { version = "2.3.1" }
|
||||
[dependencies]
|
||||
io-uring = { version = '0.7.1', optional = true }
|
||||
libc = { version = "0.2.161", optional = true }
|
||||
futures-lite = { workspace = true, optional = true }
|
||||
parking = { version = "2.2.1", optional = true }
|
||||
async-channel = { version = "2.3.1", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-lite = { workspace = true }
|
||||
|
||||
[features]
|
||||
default = ["io_uring"]
|
||||
io_uring = [
|
||||
"dep:io-uring",
|
||||
"dep:libc",
|
||||
"dep:futures-lite",
|
||||
"dep:parking",
|
||||
"dep:async-channel",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,10 +1,62 @@
|
|||
use futures_lite::FutureExt;
|
||||
use wove::Wove;
|
||||
/*
|
||||
|
||||
use std::cell::RefCell;
|
||||
|
||||
use futures_lite::{FutureExt, StreamExt as FlStreamExt};
|
||||
|
||||
use futures_util::{
|
||||
select, stream::FuturesUnordered, FutureExt as FuFutureExt, StreamExt as FuStreamExt,
|
||||
};
|
||||
use wove::{io::AsyncRead, io::AsyncWrite, Wove};
|
||||
|
||||
pub async fn go(wove: &Wove) -> std::io::Result<()> {
|
||||
let listener = wove::net::TcpListener::bind(wove, "localhost:3000").await?;
|
||||
let mut listener = wove::net::TcpListener::bind(wove, "127.0.0.1:4838").await?;
|
||||
let mut incoming = listener.incoming(wove);
|
||||
|
||||
todo!()
|
||||
let connections = RefCell::new(FuturesUnordered::new());
|
||||
|
||||
let mut accept = Box::pin(
|
||||
async {
|
||||
while let Some(connection) = FlStreamExt::next(&mut incoming).await {
|
||||
let mut connection = connection?;
|
||||
connections.borrow_mut().push(async move {
|
||||
loop {
|
||||
let data = connection
|
||||
.read(wove, vec![0; 4096].into_boxed_slice())
|
||||
.await?;
|
||||
if data.len() == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
connection.write(wove, data).await?;
|
||||
}
|
||||
|
||||
std::io::Result::Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
std::io::Result::Ok(())
|
||||
}
|
||||
.fuse(),
|
||||
);
|
||||
|
||||
let mut handle = Box::pin(
|
||||
async {
|
||||
while let Some(out) = FuStreamExt::next(&mut *connections.borrow_mut()).await {
|
||||
out?;
|
||||
}
|
||||
|
||||
std::io::Result::Ok(())
|
||||
}
|
||||
.fuse(),
|
||||
);
|
||||
|
||||
loop {
|
||||
select! {
|
||||
res = handle => res?,
|
||||
res = accept => res?,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
|
|
@ -18,3 +70,7 @@ pub fn main() {
|
|||
|
||||
pollster::block_on(fut).unwrap();
|
||||
}
|
||||
*/
|
||||
|
||||
pub fn main() {
|
||||
}
|
||||
|
|
|
|||
19
src/io.rs
Normal file
19
src/io.rs
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
use std::future::Future;
|
||||
|
||||
use crate::{aliases::IoResult, io_impl::IoImpl};
|
||||
|
||||
pub trait AsyncRead {
|
||||
fn read<I: IoImpl>(
|
||||
&mut self,
|
||||
io: &I,
|
||||
buf: Box<[u8]>,
|
||||
) -> impl Future<Output = IoResult<Box<[u8]>>>;
|
||||
}
|
||||
|
||||
pub trait AsyncWrite {
|
||||
fn write<I: IoImpl>(
|
||||
&mut self,
|
||||
io: &I,
|
||||
buf: Box<[u8]>,
|
||||
) -> impl Future<Output = IoResult<usize>>;
|
||||
}
|
||||
1
src/io_impl/io_impl.rs
Normal file
1
src/io_impl/io_impl.rs
Normal file
|
|
@ -0,0 +1 @@
|
|||
|
||||
293
src/io_impl/io_uring/mod.rs
Normal file
293
src/io_impl/io_uring/mod.rs
Normal file
|
|
@ -0,0 +1,293 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
sync::atomic::{AtomicU64, Ordering},
|
||||
task::{Context, Poll, Waker},
|
||||
};
|
||||
|
||||
use parking::Parker;
|
||||
|
||||
use crate::aliases::IoResult;
|
||||
|
||||
use super::IoImpl;
|
||||
|
||||
type CqueueEntryReceiver = async_channel::Receiver<io_uring::cqueue::Entry>;
|
||||
type CqueueEntrySender = async_channel::Sender<io_uring::cqueue::Entry>;
|
||||
pub struct UserData {
|
||||
tx: CqueueEntrySender,
|
||||
persist: bool,
|
||||
}
|
||||
impl UserData {
|
||||
fn new_boxed(tx: CqueueEntrySender, persist: bool) -> Box<Self> {
|
||||
Box::new(Self { tx, persist })
|
||||
}
|
||||
|
||||
fn into_u64(self: Box<Self>) -> u64 {
|
||||
Box::leak(self) as *mut _ as _
|
||||
}
|
||||
|
||||
unsafe fn from_u64(v: u64) -> Box<UserData> {
|
||||
let v = v as *mut UserData;
|
||||
|
||||
unsafe { Box::from_raw(v) }
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_error(i: i32) -> IoResult<i32> {
|
||||
if i < 0 {
|
||||
return Err(std::io::Error::from_raw_os_error(-i));
|
||||
}
|
||||
|
||||
Ok(i)
|
||||
}
|
||||
|
||||
pub struct Tick {
|
||||
did_handle: bool,
|
||||
submitted: usize,
|
||||
active_completions: usize,
|
||||
}
|
||||
|
||||
pub struct IoUring {
|
||||
uring: io_uring::IoUring,
|
||||
|
||||
// TODO: analyze the atomic orderings passed to the atomic operations here
|
||||
// to make sure they make sense
|
||||
|
||||
// TODO: I'm not sure that atomics even make sense here, but they let this
|
||||
// method work with &self instead of &mut self so :shrug:
|
||||
active_completions: AtomicU64,
|
||||
|
||||
_pd: core::marker::PhantomData<std::cell::RefCell<()>>,
|
||||
}
|
||||
impl IoUring {
|
||||
pub fn new() -> IoResult<Self> {
|
||||
let uring = io_uring::IoUring::new(256)?;
|
||||
|
||||
Ok(Self {
|
||||
uring,
|
||||
active_completions: AtomicU64::new(0),
|
||||
|
||||
_pd: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn submit(&self, wait_for: usize) -> IoResult<usize> {
|
||||
let submitted_count = self.uring.submit_and_wait(wait_for)?;
|
||||
self.active_completions
|
||||
.fetch_add(submitted_count as u64, Ordering::Relaxed);
|
||||
|
||||
Ok(submitted_count)
|
||||
}
|
||||
|
||||
/// Returns the current number of active requests, and a boolean indicating
|
||||
/// if we actually handled any events.
|
||||
pub fn poll(&self) -> IoResult<(usize, bool)> {
|
||||
let mut did_handle = false;
|
||||
// SAFETY:
|
||||
// - this method is synchronous, and `cq` is dropped at the end of the scope
|
||||
// - [`IoUring`] is !Sync, so it should be impossible for 2 threads to be
|
||||
// running this at the same time
|
||||
let cq = unsafe { self.uring.completion_shared() };
|
||||
for entry in cq {
|
||||
did_handle = true;
|
||||
|
||||
let ud = unsafe { UserData::from_u64(entry.user_data()) };
|
||||
ud.tx.send_blocking(entry).unwrap();
|
||||
if ud.persist {
|
||||
Box::leak(ud);
|
||||
} else {
|
||||
self.active_completions.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
self.active_completions.load(Ordering::Relaxed) as usize,
|
||||
did_handle,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn tick(&self) -> IoResult<Tick> {
|
||||
let submitted = self.submit(0)?;
|
||||
let (active_completions, did_handle) = self.poll()?;
|
||||
|
||||
Ok(Tick {
|
||||
active_completions,
|
||||
did_handle,
|
||||
submitted,
|
||||
})
|
||||
}
|
||||
|
||||
/// Runs a future. Conceptually, this is similar to [`Self::block_on`], but
|
||||
/// instead of acting as its own executor, this allows us to be embedded in
|
||||
/// another runtime
|
||||
pub async fn run<F: Future>(&self, fut: F, behavior: ActiveRequestBehavior) -> Run<F> {
|
||||
Run(&self, fut, behavior)
|
||||
}
|
||||
|
||||
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
||||
futures_lite::pin!(fut);
|
||||
|
||||
let parker = Parker::new();
|
||||
let unparker = parker.unparker();
|
||||
let waker = Waker::from(unparker);
|
||||
|
||||
let cx = &mut Context::from_waker(&waker);
|
||||
|
||||
loop {
|
||||
// Check if the future is ready. If so, return the value.
|
||||
if let Poll::Ready(v) = fut.as_mut().poll(cx) {
|
||||
return v;
|
||||
}
|
||||
|
||||
let Tick {
|
||||
did_handle,
|
||||
active_completions,
|
||||
submitted,
|
||||
} = self.tick().unwrap();
|
||||
|
||||
if did_handle {
|
||||
// If we handled an event, it's likely that our future can make progress,
|
||||
// so continue the loop
|
||||
continue;
|
||||
}
|
||||
|
||||
if submitted > 0 {
|
||||
// We submitted an event. It's possible that our future can make progress
|
||||
// once we poll, so continue the loop
|
||||
continue;
|
||||
}
|
||||
|
||||
if active_completions > 0 {
|
||||
// We didn't submit an event, but we do have completions in-flight.
|
||||
// We should block until one of them completes, and then re-poll
|
||||
self.submit(1).unwrap();
|
||||
continue;
|
||||
}
|
||||
|
||||
// If we've gotten to this point, it's likely that we're waiting on a
|
||||
// future that depends on another thread to make progress. In that case,
|
||||
// park the current thread until the waker is called.
|
||||
parker.park();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IoImpl for IoUring {
|
||||
async fn sleep(&self, duration: std::time::Duration) -> IoResult<()> {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
|
||||
let ts = io_uring::types::Timespec::new()
|
||||
.sec(duration.as_secs())
|
||||
.nsec(duration.subsec_nanos());
|
||||
let entry = io_uring::opcode::Timeout::new(&ts as *const _)
|
||||
.build()
|
||||
.user_data(UserData::new_boxed(tx, false).into_u64());
|
||||
|
||||
unsafe { self.uring.submission_shared().push(&entry).unwrap() }
|
||||
|
||||
let entry = rx.recv().await.unwrap();
|
||||
handle_error(entry.result())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Behavior used in [`Run`] when the future is not ready, the last [`IoUring::tick`]
|
||||
/// didn't submit or handle any events, and there are requests in flight
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub enum ActiveRequestBehavior {
|
||||
/// Cause a panic
|
||||
Panic,
|
||||
|
||||
/// Block until a completion is returned
|
||||
Block,
|
||||
|
||||
/// Return pending.
|
||||
///
|
||||
/// <div class="warning">
|
||||
///
|
||||
/// NOTE: this relies on the fact that the executor is going to poll this
|
||||
/// _eventually_. If it doesn't, it's likely that you'll get a deadlock.
|
||||
///
|
||||
/// </div>
|
||||
Pending,
|
||||
}
|
||||
|
||||
pub struct Run<'a, F: Future>(&'a IoUring, F, ActiveRequestBehavior);
|
||||
impl<F: Future> Future for Run<'_, F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let io = self.0;
|
||||
let behavior = self.2;
|
||||
let fut = unsafe { self.map_unchecked_mut(|s| &mut s.1) };
|
||||
|
||||
if let Poll::Ready(out) = fut.poll(cx) {
|
||||
return Poll::Ready(out);
|
||||
}
|
||||
|
||||
let Tick {
|
||||
did_handle,
|
||||
submitted,
|
||||
active_completions,
|
||||
} = io.tick().unwrap();
|
||||
|
||||
if did_handle {
|
||||
// We handled an event, it's likely that the future can make progress
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
if submitted > 0 {
|
||||
// We submitted an event, it's possible that the future can make progress,
|
||||
// so we should wake the executor and get re-polled
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
if active_completions > 0 {
|
||||
// We have completions in flight, but they're not ready yet.
|
||||
match behavior {
|
||||
ActiveRequestBehavior::Panic => {
|
||||
panic!("The future was not ready, and there are completions in-flight")
|
||||
},
|
||||
ActiveRequestBehavior::Block => {
|
||||
io.submit(1).unwrap();
|
||||
return Poll::Pending;
|
||||
},
|
||||
ActiveRequestBehavior::Pending => {
|
||||
return Poll::Pending;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// The future likely depends on another thread, so return pending
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
mod block_on {
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::io_impl::io_uring::IoUring;
|
||||
|
||||
#[test]
|
||||
fn simple() {
|
||||
let uring = IoUring::new().unwrap();
|
||||
let out = uring.block_on(async { 5 });
|
||||
assert_eq!(out, 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timer() {
|
||||
let uring = &IoUring::new().unwrap();
|
||||
let out = uring.block_on(async {
|
||||
crate::time::sleep(uring, Duration::from_secs(1)).await;
|
||||
5
|
||||
});
|
||||
|
||||
assert_eq!(out, 5);
|
||||
}
|
||||
}
|
||||
}
|
||||
9
src/io_impl/mod.rs
Normal file
9
src/io_impl/mod.rs
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
use crate::aliases::IoResult;
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(feature = "io_uring")]
|
||||
pub mod io_uring;
|
||||
|
||||
pub trait IoImpl {
|
||||
async fn sleep(&self, duration: Duration) -> IoResult<()>;
|
||||
}
|
||||
86
src/lib.rs
86
src/lib.rs
|
|
@ -1,88 +1,14 @@
|
|||
#![feature(async_closure)]
|
||||
#![feature(async_iterator)]
|
||||
#![feature(never_type)]
|
||||
|
||||
mod aliases;
|
||||
pub mod fs;
|
||||
pub mod net;
|
||||
mod plat;
|
||||
mod wove;
|
||||
// pub mod fs;
|
||||
pub mod io;
|
||||
pub mod io_impl;
|
||||
// pub mod net;
|
||||
pub mod time;
|
||||
|
||||
use std::{future::Future, pin::Pin, task::Poll};
|
||||
|
||||
use aliases::IoResult;
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(crate) use plat::linux as plat_impl;
|
||||
|
||||
pub struct Tock(bool);
|
||||
impl Tock {
|
||||
fn new() -> Self {
|
||||
Self(false)
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Tock {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
if self.0 {
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
let this = unsafe { self.get_unchecked_mut() };
|
||||
this.0 = true;
|
||||
cx.waker().wake_by_ref();
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Wove {
|
||||
platform: plat_impl::Platform,
|
||||
}
|
||||
|
||||
impl Wove {
|
||||
pub fn new() -> IoResult<Self> {
|
||||
Ok(Self {
|
||||
platform: plat_impl::Platform::new()?,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(&self) -> IoResult<!> {
|
||||
loop {
|
||||
self.platform.tick().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::Wove;
|
||||
use futures_lite::future::try_zip;
|
||||
|
||||
#[test]
|
||||
fn sketch() {
|
||||
let mut wove = Wove::new().unwrap();
|
||||
let wove = &mut wove;
|
||||
let fut = async {
|
||||
let run = async {
|
||||
wove.run().await?;
|
||||
};
|
||||
|
||||
let hello = crate::fs::read_to_string(wove, PathBuf::from("test_data/hello.txt"));
|
||||
let goodbye =
|
||||
crate::fs::read_to_string(wove, PathBuf::from("test_data/goodbye.txt"));
|
||||
let both = try_zip(hello, goodbye);
|
||||
|
||||
let contents = futures_lite::future::race(both, run).await;
|
||||
|
||||
contents
|
||||
};
|
||||
|
||||
let (hello, goodbye) = pollster::block_on(fut).unwrap();
|
||||
|
||||
assert_eq!(hello, "hello!\n");
|
||||
assert_eq!(goodbye, "goodbye!\n");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
67
src/net.rs
67
src/net.rs
|
|
@ -1,6 +1,34 @@
|
|||
use std::net::ToSocketAddrs;
|
||||
use std::{async_iter::AsyncIterator, net::ToSocketAddrs, pin::Pin};
|
||||
|
||||
use crate::{aliases::IoResult, plat_impl, Wove};
|
||||
use futures_lite::Stream;
|
||||
|
||||
use crate::{
|
||||
aliases::IoResult,
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
plat_impl, Wove,
|
||||
};
|
||||
|
||||
pub struct TcpStream(pub(crate) plat_impl::FileHandle);
|
||||
|
||||
impl AsyncRead for TcpStream {
|
||||
fn read(
|
||||
&mut self,
|
||||
wove: &Wove,
|
||||
buf: Box<[u8]>,
|
||||
) -> impl std::future::Future<Output = IoResult<Box<[u8]>>> {
|
||||
plat_impl::read(&wove.platform, &mut self.0, 0, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for TcpStream {
|
||||
fn write(
|
||||
&mut self,
|
||||
wove: &Wove,
|
||||
buf: Box<[u8]>,
|
||||
) -> impl std::future::Future<Output = IoResult<usize>> {
|
||||
plat_impl::write(&wove.platform, &mut self.0, 0, buf)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TcpListener(plat_impl::FileHandle);
|
||||
|
||||
|
|
@ -22,4 +50,39 @@ impl TcpListener {
|
|||
|
||||
Err(std::io::Error::other("No addrs returned"))
|
||||
}
|
||||
|
||||
pub fn incoming<'a>(&'a mut self, wove: &'a Wove) -> Incoming<'a> {
|
||||
Incoming::register(wove, self)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Incoming<'a>(plat_impl::Incoming<'a>);
|
||||
impl<'a> Incoming<'a> {
|
||||
fn register(wove: &'a Wove, listener: &'a mut TcpListener) -> Self {
|
||||
Incoming(plat_impl::accept_many(&wove.platform, &mut listener.0))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncIterator for Incoming<'_> {
|
||||
type Item = IoResult<TcpStream>;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
|
||||
|
||||
AsyncIterator::poll_next(inner, cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Incoming<'_> {
|
||||
type Item = IoResult<TcpStream>;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
AsyncIterator::poll_next(self, cx)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,43 +1,17 @@
|
|||
use std::{
|
||||
ffi::CString, future::Future, net::IpAddr, net::SocketAddr, path::PathBuf, pin::Pin,
|
||||
async_iter::AsyncIterator,
|
||||
ffi::CString,
|
||||
future::Future,
|
||||
net::{IpAddr, SocketAddr},
|
||||
os::fd::IntoRawFd,
|
||||
path::PathBuf,
|
||||
pin::Pin,
|
||||
};
|
||||
|
||||
use futures_lite::Stream;
|
||||
use io_uring::IoUring;
|
||||
|
||||
use crate::{aliases::IoResult, Tock};
|
||||
|
||||
pub type CallbackFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
|
||||
pub type Callback = Box<dyn FnOnce(io_uring::cqueue::Entry) -> CallbackFuture + 'static>;
|
||||
|
||||
fn handle_error(i: i32) -> IoResult<i32> {
|
||||
if i < 0 {
|
||||
return Err(std::io::Error::from_raw_os_error(-i));
|
||||
}
|
||||
|
||||
Ok(i)
|
||||
}
|
||||
|
||||
fn box_callback_as_u64<T: 'static>(
|
||||
tx: async_channel::Sender<IoResult<T>>,
|
||||
callback: impl async FnOnce(io_uring::cqueue::Entry) -> IoResult<T> + 'static,
|
||||
) -> u64 {
|
||||
let bx: Callback = Box::new(move |entry| {
|
||||
Box::pin(async move {
|
||||
tx.send(callback(entry).await).await.unwrap();
|
||||
})
|
||||
});
|
||||
let bx: Box<Callback> = Box::new(bx);
|
||||
let leaked = Box::leak(bx);
|
||||
let ptr = leaked as *mut _;
|
||||
|
||||
ptr as u64
|
||||
}
|
||||
|
||||
unsafe fn u64_as_callback(v: u64) -> Box<Callback> {
|
||||
let v = v as *mut Callback;
|
||||
|
||||
unsafe { Box::from_raw(v) }
|
||||
}
|
||||
use crate::{aliases::IoResult, net::TcpStream};
|
||||
|
||||
pub struct PlatformLinux {
|
||||
uring: io_uring::IoUring,
|
||||
|
|
@ -60,12 +34,13 @@ impl PlatformLinux {
|
|||
|
||||
let cq = unsafe { self.uring.completion_shared() };
|
||||
for entry in cq {
|
||||
let cb = unsafe { u64_as_callback(entry.user_data()) };
|
||||
cb(entry).await;
|
||||
let ud = unsafe { UserData::from_u64(entry.user_data()) };
|
||||
ud.tx.send(entry).await.unwrap();
|
||||
if ud.persist {
|
||||
Box::leak(ud);
|
||||
}
|
||||
}
|
||||
|
||||
Tock::new().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
@ -77,27 +52,33 @@ pub async fn open_file(p: &PlatformLinux, path: PathBuf) -> IoResult<FileHandle>
|
|||
let entry =
|
||||
io_uring::opcode::OpenAt::new(io_uring::types::Fd(libc::AT_FDCWD), path.as_ptr())
|
||||
.build()
|
||||
.user_data(box_callback_as_u64(tx, async move |entry| {
|
||||
let fd = handle_error(entry.result())?;
|
||||
let fd = io_uring::types::Fd(fd);
|
||||
|
||||
std::mem::drop(path);
|
||||
|
||||
Ok(fd)
|
||||
}));
|
||||
.user_data(UserData::new_boxed(tx, false).into_u64());
|
||||
|
||||
unsafe {
|
||||
p.uring.submission_shared().push(&entry).unwrap();
|
||||
}
|
||||
|
||||
rx.recv().await.unwrap()
|
||||
let entry = rx.recv().await.unwrap();
|
||||
let fd = handle_error(entry.result())?;
|
||||
let fd = io_uring::types::Fd(fd);
|
||||
|
||||
Ok(fd)
|
||||
}
|
||||
|
||||
pub async fn open_tcp_socket(
|
||||
p: &PlatformLinux,
|
||||
socket_addr: SocketAddr,
|
||||
) -> IoResult<FileHandle> {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
// FIXME(Blocking)
|
||||
// There is some some magic missing in the commented out code to make the
|
||||
// socket clean up properly on process exit and whatnot. For now, just use
|
||||
// the std implementation and cast it to a FileHandle
|
||||
|
||||
let listener = std::net::TcpListener::bind(socket_addr)?;
|
||||
Ok(io_uring::types::Fd(listener.into_raw_fd()))
|
||||
|
||||
/*
|
||||
// let (tx, rx) = async_channel::bounded(1);
|
||||
|
||||
let domain = match () {
|
||||
_ if socket_addr.is_ipv4() => libc::AF_INET,
|
||||
|
|
@ -107,34 +88,42 @@ pub async fn open_tcp_socket(
|
|||
|
||||
let entry = io_uring::opcode::Socket::new(domain, libc::SOCK_STREAM, 0)
|
||||
.build()
|
||||
.user_data(box_callback_as_u64(tx, async move |entry| {
|
||||
let res = handle_error(entry.result())?;
|
||||
.user_data(UserData::new_boxed(tx, false).into_u64());
|
||||
|
||||
let sock = libc::sockaddr_in {
|
||||
sin_family: domain as _,
|
||||
sin_port: socket_addr.port(),
|
||||
sin_addr: libc::in_addr {
|
||||
s_addr: match socket_addr.ip() {
|
||||
IpAddr::V4(v) => v.to_bits(),
|
||||
IpAddr::V6(v) => panic!(),
|
||||
},
|
||||
},
|
||||
sin_zero: Default::default(),
|
||||
};
|
||||
unsafe {
|
||||
p.uring.submission_shared().push(&entry).unwrap();
|
||||
}
|
||||
|
||||
// FIXME(Blocking)
|
||||
let fd = handle_error(unsafe {
|
||||
libc::bind(
|
||||
res,
|
||||
&sock as *const _ as *const _,
|
||||
core::mem::size_of_val(&sock) as u32,
|
||||
)
|
||||
})?;
|
||||
let entry = rx.recv().await.unwrap();
|
||||
|
||||
Ok(io_uring::types::Fd(fd))
|
||||
}));
|
||||
let fd = handle_error(entry.result())?;
|
||||
|
||||
rx.recv().await.unwrap()
|
||||
let sock = libc::sockaddr_in {
|
||||
sin_family: domain as _,
|
||||
sin_port: socket_addr.port().to_be(),
|
||||
sin_addr: libc::in_addr {
|
||||
s_addr: match socket_addr.ip() {
|
||||
IpAddr::V4(v) => v.to_bits().to_be(),
|
||||
IpAddr::V6(_) => panic!(),
|
||||
},
|
||||
},
|
||||
sin_zero: Default::default(),
|
||||
};
|
||||
|
||||
// FIXME(Blocking)
|
||||
handle_error(unsafe {
|
||||
libc::bind(
|
||||
fd,
|
||||
&sock as *const _ as *const _,
|
||||
core::mem::size_of_val(&sock) as u32,
|
||||
)
|
||||
})?;
|
||||
|
||||
// FIXME(Blocking)
|
||||
handle_error(unsafe { libc::listen(fd, libc::SOMAXCONN) })?;
|
||||
|
||||
Ok(io_uring::types::Fd(fd))
|
||||
*/
|
||||
}
|
||||
|
||||
pub async fn read(
|
||||
|
|
@ -148,20 +137,115 @@ pub async fn read(
|
|||
let entry = io_uring::opcode::Read::new(*f, buf.as_mut_ptr(), buf.len() as _)
|
||||
.offset(offset as _)
|
||||
.build()
|
||||
.user_data(box_callback_as_u64(tx, async move |entry| {
|
||||
let read_amt = handle_error(entry.result())?;
|
||||
|
||||
let mut buf = buf.into_vec();
|
||||
buf.truncate(read_amt as _);
|
||||
|
||||
Ok(buf.into_boxed_slice())
|
||||
}));
|
||||
.user_data(UserData::new_boxed(tx, false).into_u64());
|
||||
|
||||
unsafe {
|
||||
p.uring.submission_shared().push(&entry).unwrap();
|
||||
}
|
||||
|
||||
rx.recv().await.unwrap()
|
||||
let entry = rx.recv().await.unwrap();
|
||||
let read_amt = handle_error(entry.result())?;
|
||||
|
||||
let mut buf = buf.into_vec();
|
||||
buf.truncate(read_amt as _);
|
||||
|
||||
Ok(buf.into_boxed_slice())
|
||||
}
|
||||
|
||||
pub async fn write(
|
||||
p: &PlatformLinux,
|
||||
f: &mut FileHandle,
|
||||
offset: usize,
|
||||
mut buf: Box<[u8]>,
|
||||
) -> IoResult<usize> {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
|
||||
let entry = io_uring::opcode::Write::new(*f, buf.as_mut_ptr(), buf.len() as _)
|
||||
.offset(offset as _)
|
||||
.build()
|
||||
.user_data(UserData::new_boxed(tx, false).into_u64());
|
||||
|
||||
unsafe {
|
||||
p.uring.submission_shared().push(&entry).unwrap();
|
||||
}
|
||||
|
||||
let entry = rx.recv().await.unwrap();
|
||||
let write_amt = handle_error(entry.result())?;
|
||||
|
||||
Ok(write_amt as usize)
|
||||
}
|
||||
|
||||
pub(crate) struct Incoming<'a> {
|
||||
plat: &'a PlatformLinux,
|
||||
rx: Pin<Box<CqueueEntryReceiver>>,
|
||||
cb_id: u64,
|
||||
}
|
||||
|
||||
pub fn accept_many<'a>(p: &'a PlatformLinux, f: &mut FileHandle) -> Incoming<'a> {
|
||||
let (tx, rx) = async_channel::unbounded();
|
||||
|
||||
let cb_id = UserData::new_boxed(tx, true).into_u64();
|
||||
|
||||
let entry = io_uring::opcode::AcceptMulti::new(*f)
|
||||
.build()
|
||||
.user_data(cb_id);
|
||||
|
||||
unsafe {
|
||||
p.uring.submission_shared().push(&entry).unwrap();
|
||||
}
|
||||
|
||||
Incoming {
|
||||
plat: p,
|
||||
rx: Box::pin(rx),
|
||||
cb_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cancel(p: &PlatformLinux, id: u64) {
|
||||
let entry = io_uring::opcode::AsyncCancel::new(id).build();
|
||||
|
||||
unsafe {
|
||||
p.uring.submission_shared().push(&entry).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncIterator for Incoming<'_> {
|
||||
type Item = IoResult<TcpStream>;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
let rx = unsafe { self.map_unchecked_mut(|s| &mut s.rx) };
|
||||
let mut fut = rx.recv();
|
||||
let pinned = unsafe { Pin::new_unchecked(&mut fut) };
|
||||
|
||||
pinned
|
||||
.poll(cx)
|
||||
.map(|entry| {
|
||||
let fd = handle_error(entry.unwrap().result())?;
|
||||
|
||||
Ok(crate::net::TcpStream(io_uring::types::Fd(fd)))
|
||||
})
|
||||
.map(Some)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Incoming<'_> {
|
||||
type Item = IoResult<TcpStream>;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
AsyncIterator::poll_next(self, cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Incoming<'_> {
|
||||
fn drop(&mut self) {
|
||||
cancel(self.plat, self.cb_id);
|
||||
}
|
||||
}
|
||||
|
||||
pub type Platform = PlatformLinux;
|
||||
|
|
|
|||
7
src/time.rs
Normal file
7
src/time.rs
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use crate::io_impl::IoImpl;
|
||||
|
||||
pub async fn sleep<I: IoImpl>(io: &I, duration: Duration) {
|
||||
let _ = io.sleep(duration).await;
|
||||
}
|
||||
Loading…
Reference in a new issue