Rust async execution
As a longtime C/C++ programmer that’s done a fair amount of concurrency work (especially network stuff for Fanout), I’m intrigued by the Rust programming language and its recent async capabilities.
Like C/C++, Rust has no runtime. Interestingly, this remains true even with the introduction of async. Using the async
or await
keywords doesn’t just magically run code concurrently. It’s on you to bring your own runtime such as tokio or async-std, to interface with the core language elements.
But, you can also implement your own runtime! Over the past months I’ve been learning all the little implementation details. In this article I’ll describe how to execute Rust async functions using nothing but the standard library.
Much has already been written about async and futures elsewhere, so this article will focus mostly on how to build an executor.
Language vs. runtime
Basically, the Rust language provides:
- The Future trait, which allows stepping through an execution of something.
- The
async
keyword, which rewrites your code to implementFuture
. - The
await
keyword, which allows using otherFuture
instances within async-generated code.
And that’s it. Notably, Rust provides no concrete implementations of Future
outside of the ones you ask it to generate with the async
keyword.
In order to do anything useful with Rust async, you’ll need some non-generated implementations of Future
(using only generated futures is pointless), and you’ll need a way to execute the Future
instances.
Personally, I think this is a brilliant design. Rust is able to provide nice async syntax without having to commit to a particular runtime.
Note that even though it sounds like the language itself doesn’t provide much, its built-in async code generation is a challenging problem.
Implementing a Future
Here’s the declaration for the Future
trait:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
You can implement a Future
by hand. For example, here’s one that produces an integer:
use std::future::Future;
use std::task::{Context, Poll};
struct IntFuture {
x: i32,
}
impl Future for IntFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<i32> {
Poll::Ready(self.x)
}
}
Or you could generate the same thing using the async
keyword:
async fn int_future(x: i32) -> i32 {
x
}
In both of the above cases, we end up with a type that satisfies the trait Future<Output = i32>
:
fn eat<F: Future<Output = i32>>(_: F) {}
fn main() {
eat(IntFuture { x: 42 });
eat(int_future(42));
}
Nested futures
If you have a chain of async functions, for example an HTTP request async function that calls some TCP I/O async functions, it will be compiled into a single encapsulating Future
. Polling this future will cause the inner ones to get polled. Whatever is doing the polling has no awareness of any inner futures.
For example:
async fn get_audience() -> &'static str {
"world"
}
async fn make_greeting() -> String {
let audience = get_audience().await;
format!("hello {}", audience)
}
In the above code, if make_greeting()
is called to get a future, polling that future will in-turn poll the future generated by get_audience()
, but this could be considered an implementation detail of make_greeting()
.
All this to say: when it comes to executing futures, we really only need to account for the top-level futures.
Calling poll
Making a Future
is somewhat straightforward. Polling a Future
, not so much. Let’s look at the signature for poll()
again:
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
We need a Pin
and a Context
. What the heck are those, you might ask?
Pin
Pin
is a way of indicating that some memory won’t be moved. Normally, structs can be moved from one memory location to another without any issue. This is because Rust forbids self-references in safe code. For example, a struct can exist on the stack, then be moved to a Box
on the heap, and Rust can perform the move by simply copying the bytes a-la-memcpy
. However, one of the great achievements of Rust async is borrowing across await points, and this requires bending the rules a little bit. An async-generated Future
needs to have the ability to keep references to its own internal memory when await is used, and thus it needs assurance that its memory won’t be moved around between poll()
calls.
The way Pin
is used here is a little unintuitive. The poll()
function consumes the Pin
. This means in order to poll a future, you need to instantiate a new Pin
wrapper every time you poll. Here’s how that looks:
let mut f = IntFuture { x: 42 };
let mut cx = ... // we'll talk about this later
let p = unsafe { Pin::new_unchecked(&mut f) };
match p.poll(&mut cx) {
Poll::Ready(x) => println!("got int: {}", x),
Poll::Pending => println!("future not ready"),
}
(side note: it’s interesting that Pin
can be used as the self
type. It seems Rust doesn’t limit self
to just T
, &T
, and &mut T
, but allows some other types from a fixed list).
Are you off the hook about keeping the memory pinned once the Pin
has been consumed and dropped by the poll()
method? Nope! Quoth the docs, “A value, once pinned, must remain pinned forever”. In fact, this is why constructing the Pin
is unsafe. The unsafe part is you’re eventually going to lose the Pin
, but you’ll still need to uphold the pinning contract despite not having the Pin
to protect you anymore.
Context and Waker
Currently, the only thing Context
does is provide access to a Waker
. The Waker
is used to indicate when the future should be polled again, if poll()
returns Poll::Pending
. The reason poll()
takes a Context
rather than simply a Waker
is to enable expansion. Other things may be added to Context
in later Rust versions.
Constructing a Context
takes some effort. Its sole constructor, Context::from_waker, needs a Waker
. The sole constructor for Waker
, Waker::from_raw, needs a RawWaker
. And the sole constructor for RawWaker
, RawWaker::new, needs a RawWakerVTable
. Still with me?
Let’s implement a minimal RawWakerVTable
of no-ops:
use std::task::{RawWaker, RawWakerVTable};
unsafe fn vt_clone(data: *const ()) -> RawWaker {
RawWaker::new(data, &VTABLE)
}
unsafe fn vt_wake(_data: *const ()) {
}
unsafe fn vt_wake_by_ref(_data: *const ()) {
}
unsafe fn vt_drop(_data: *const ()) {
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(
vt_clone,
vt_wake,
vt_wake_by_ref,
vt_drop
);
Then we can construct a Waker
like this:
let rw = RawWaker::new(&(), &VTABLE);
let w = unsafe { Waker::from_raw(rw) };
All this vtable stuff is to allow us to provide our own waking behavior. RawWaker
is just a data pointer and a vtable. Waker
wraps that and implements familiar Rust traits like Clone
and Drop
. The Waker
constructor is unsafe since the vtable functions may need to dereference raw pointers.
You may wonder why Rust uses this custom vtable instead of just making Waker
a trait. I believe it’s done this way so Waker
can be ownable while also avoiding a heap allocation. Using a trait probably would have required Box
somewhere.
Finally, we can construct a Context
:
let mut cx = Context::from_waker(&w);
Whew!
Of course, in a real application we’ll need our Waker
to actually do something. We’ll discuss that later.
Calling poll, for real this time
Now that we know how to construct a Pin
and a Context
, we can call poll()
. Here’s the entire source of a program that polls a future:
use std::future::Future;
use std::pin::Pin;
use std::task::{Poll, Context, Waker, RawWaker, RawWakerVTable};
unsafe fn vt_clone(data: *const ()) -> RawWaker {
RawWaker::new(data, &VTABLE)
}
unsafe fn vt_wake(_data: *const ()) {
}
unsafe fn vt_wake_by_ref(_data: *const ()) {
}
unsafe fn vt_drop(_data: *const ()) {
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(
vt_clone,
vt_wake,
vt_wake_by_ref,
vt_drop
);
async fn get_greeting() -> &'static str {
"hello world"
}
fn main() {
let mut f = get_greeting();
let rw = RawWaker::new(&(), &VTABLE);
let w = unsafe { Waker::from_raw(rw) };
let mut cx = Context::from_waker(&w);
let p = unsafe { Pin::new_unchecked(&mut f) };
assert_eq!(p.poll(&mut cx), Poll::Ready("hello world"));
}
Triggering the Waker
Let’s make a future that knows how to wake itself.
Below is an implementation for a timer. It can be constructed with a desired duration. The first time it is polled, it spawns a thread and returns Poll::Pending
. The next time it is polled, it returns Poll::Ready
. The thread sleeps and then calls wake()
.
use std::time;
use std::thread;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct TimerFuture {
duration: time::Duration,
handle: Option<thread::JoinHandle<()>>,
}
impl TimerFuture {
fn new(duration: time::Duration) -> Self {
Self {
duration,
handle: None,
}
}
}
impl Future for TimerFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
match &self.handle {
None => {
let duration = self.duration;
let waker = cx.waker().clone();
self.handle = Some(thread::spawn(move || {
thread::sleep(duration);
waker.wake();
}));
Poll::Pending
},
Some(_) => {
let handle = self.handle.take().unwrap();
handle.join().unwrap();
Poll::Ready(())
},
}
}
}
// convenience wrapper for use in async functions
fn sleep(duration: time::Duration) -> TimerFuture {
TimerFuture::new(duration)
}
The Waker
is cloned so we can keep it around after poll()
returns. In fact, we move it to the other thread.
Note that in a real application you would not want to spawn a thread for every timer. Instead, timers would probably be registered with some evented reactor. For this example though, we’ll keep it simple.
Managing different kinds of futures
Before we get to the executor, we need to address one last challenge: varying Future
types.
Different futures can have different Output
types (e.g. Future<Output = i32>
and Future<Output = String>
), and thus different poll()
return values. This means if we build an executor, we can’t simply throw the futures into something like Vec<Box<dyn Future>>
, and even if that were possible we wouldn’t be able to process them using the same code.
The solution to this, as far as I can tell, is to pick a common return type for all of the futures tracked by the executor (i.e. the top-level futures). For example, you could decide that all top-level futures should have no return type, so you can contain them in a Vec<Box<dyn Future<Output = ()>>>
. Note that nested futures can still have arbitrary return types. An async function with no return value can await a future that returns String
. This works because all the nested futures are hidden within the outer future, and the executor only needs to care about the outer future.
Our typing problems don’t end there though. The poll()
function requires a pinned reference to its concrete type. Recall the capitalized Self
in its signature:
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
This means even if two different future implementations have the same Output
type and thus conform to the same trait, we still can’t process them using non-generic code! What the what?
Probably the concrete type is needed so Pin
can protect an area of known size. In any case, dyn Future
is sort of useless.
A way around this is to hide the details in closures. We can use monomorphization to generate different code for each concrete future implementation, but have the closures share the same function signature. Below, we create closures that conform to the trait dyn FnMut(&mut Context) -> Poll<()>
(for futures with Output = ()
) and box them:
type PollFn = dyn FnMut(&mut Context) -> Poll<()>;
struct WrappedFuture {
poll_fn: Box<PollFn>,
}
impl WrappedFuture {
pub fn new<F>(mut f: F) -> Self
where
F: Future<Output = ()> + 'static
{
let c = move |cx: &mut Context| {
let p: Pin<&mut F> = unsafe { Pin::new_unchecked(&mut f) };
match p.poll(cx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
};
Self {
poll_fn: Box::new(c),
}
}
pub fn poll(&mut self, cx: &mut Context) -> Poll<()> {
(self.poll_fn)(cx)
}
}
With WrappedFuture
, we can treat all of our futures the same:
// generates Future<Output = ()>
async fn print_hello() {
println!("hello");
}
// generates Future<Output = ()>
async fn print_goodbye() {
println!("goodbye");
}
fn main() {
let mut futures: Vec<WrappedFuture> = Vec::new();
futures.push(WrappedFuture::new(print_hello()));
futures.push(WrappedFuture::new(print_goodbye()));
for f in futures.iter_mut() {
let mut cx = ... // context
assert_eq!(f.poll(&mut cx), Poll::Ready(()));
}
}
A simple executor
To execute our futures, we need to do three things:
- Keep track of the futures somewhere.
- Poll the futures when they are created.
- Implement
Waker
so we know when to poll them again.
Below is a basic executor. It has two vectors, need_poll
and sleeping
, to keep track of the futures. Calling spawn
adds a future to need_poll
.
Instead of working with WrappedFuture
directly, we wrap it with Arc
/Mutex
so the futures can be shared across threads. We declare an alias, SharedFuture
, to cut down on the noise.
type SharedFuture = Arc<Mutex<WrappedFuture>>;
struct ExecutorData {
need_poll: Vec<SharedFuture>,
sleeping: Vec<SharedFuture>,
}
struct Executor {
data: Arc<(Mutex<ExecutorData>, Condvar)>,
}
impl Executor {
pub fn new() -> Self {
let data = ExecutorData {
need_poll: Vec::new(),
sleeping: Vec::new(),
};
Self {
data: Arc::new((Mutex::new(data), Condvar::new())),
}
}
pub fn spawn<F>(&self, f: F)
where
F: Future<Output = ()> + 'static
{
let (lock, _) = &*self.data;
let mut data = lock.lock().unwrap();
data.need_poll.push(Arc::new(Mutex::new(WrappedFuture::new(f))));
}
pub fn wake(
data: &mut Arc<(Mutex<ExecutorData>, Condvar)>,
wf: &SharedFuture
) {
let (lock, cond) = &**data;
let mut data = lock.lock().unwrap();
let mut pos = None;
for (i, f) in data.sleeping.iter().enumerate() {
if Arc::ptr_eq(f, wf) {
pos = Some(i);
break;
}
}
if pos.is_none() {
// unknown future
return
}
let pos = pos.unwrap();
let f = data.sleeping.remove(pos);
data.need_poll.push(f);
cond.notify_one();
}
pub fn exec(&self) {
loop {
let (lock, cond) = &*self.data;
let mut data = lock.lock().unwrap();
if data.need_poll.is_empty() {
if data.sleeping.is_empty() {
// no tasks, we're done
break;
}
data = cond.wait(data).unwrap();
}
let need_poll = mem::replace(
&mut data.need_poll,
Vec::new()
);
mem::drop(data);
let mut need_sleep = Vec::new();
for f in need_poll {
let w = MyWaker {
data: Arc::clone(&self.data),
f: Arc::new(Mutex::new(Some(Arc::clone(&f)))),
}.into_task_waker();
let mut cx = Context::from_waker(&w);
let result = {
f.lock().unwrap().poll(&mut cx)
};
match result {
Poll::Ready(_) => {},
Poll::Pending => {
need_sleep.push(f);
},
}
}
let mut data = lock.lock().unwrap();
data.sleeping.append(&mut need_sleep);
}
}
}
The exec
function loops and polls futures. First, it checks if there are futures that need polling. If there aren’t any, it will wait for a sleeping one to awaken. Once there are futures to poll, they are polled. If poll returns Ready
, then the future has completed and we can simply let it go. If poll returns Pending
, then we move the future over into the sleeping
vector. If there are no futures left, the loop exits.
In order to wake the executor, someone needs to call Executor::wake
. It’s an associated function intended to be called from another thread by MyWaker
.
Here’s the code for MyWaker
:
#[derive(Clone)]
struct MyWaker {
data: Arc<(Mutex<ExecutorData>, Condvar)>,
f: Arc<Mutex<Option<SharedFuture>>>,
}
impl MyWaker {
...
fn wake(mut self) {
self.wake_by_ref();
}
fn wake_by_ref(&mut self) {
let f: &mut Option<SharedFuture> = &mut self.f.lock().unwrap();
if f.is_some() {
let f: SharedFuture = f.take().unwrap();
Executor::wake(&mut self.data, &f);
}
}
}
Waker implementations are intended for one-time use but must also be clonable. This is why the inner SharedFuture
is wrapped by Option
as well as another Arc
/Mutex
. The set of wakers for a particular future have safe shared access to a single Option<SharedFuture>
. When wake()
is called on any one of the wakers in the set, the future is awakened and the option is set to None
.
In order for our waker implementation to be usable, we need to integrate it into the vtable so that it can be controlled by Waker
:
impl MyWaker {
...
fn into_task_waker(self) -> Waker {
let w = Box::new(self);
let rw = RawWaker::new(Box::into_raw(w) as *mut (), &VTABLE);
unsafe { Waker::from_raw(rw) }
}
...
}
unsafe fn vt_clone(data: *const ()) -> RawWaker {
let w = (data as *const MyWaker).as_ref().unwrap();
let new_w = Box::new(w.clone());
RawWaker::new(Box::into_raw(new_w) as *mut (), &VTABLE)
}
unsafe fn vt_wake(data: *const ()) {
let w = Box::from_raw(data as *mut MyWaker);
w.wake();
}
unsafe fn vt_wake_by_ref(data: *const ()) {
let w = (data as *mut MyWaker).as_mut().unwrap();
w.wake_by_ref();
}
unsafe fn vt_drop(data: *const ()) {
Box::from_raw(data as *mut MyWaker);
}
Basically the above unsafe code wires up the vtable functions to MyWaker
’s normal Rust methods, handling cloning and dropping.
Obviously this is not the most sophisticated executor, but it should work enough as an example.
Using the executor
Let’s give it a spin! The sleep
function is a wrapper around TimerFuture
which we declared earlier.
fn main() {
let e = Executor::new();
e.spawn(async {
println!("a");
sleep(time::Duration::from_millis(200)).await;
println!("c");
});
e.spawn(async {
sleep(time::Duration::from_millis(100)).await;
println!("b");
sleep(time::Duration::from_millis(200)).await;
println!("d");
});
e.exec();
}
The output, as expected:
a
b
c
d
Full source here.