As a longtime C/C++ programmer that’s done a fair amount of concurrency work (especially network stuff for Fanout), I’m intrigued by the Rust programming language and its recent async capabilities.

Like C/C++, Rust has no runtime. Interestingly, this remains true even with the introduction of async. Using the async or await keywords doesn’t just magically run code concurrently. It’s on you to bring your own runtime such as tokio or async-std, to interface with the core language elements.

But, you can also implement your own runtime! Over the past months I’ve been learning all the little implementation details. In this article I’ll describe how to execute Rust async functions using nothing but the standard library.

Much has already been written about async and futures elsewhere, so this article will focus mostly on how to build an executor.

Language vs. runtime

Basically, the Rust language provides:

  • The Future trait, which allows stepping through an execution of something.
  • The async keyword, which rewrites your code to implement Future.
  • The await keyword, which allows using other Future instances within async-generated code.

And that’s it. Notably, Rust provides no concrete implementations of Future outside of the ones you ask it to generate with the async keyword.

In order to do anything useful with Rust async, you’ll need some non-generated implementations of Future (using only generated futures is pointless), and you’ll need a way to execute the Future instances.

Personally, I think this is a brilliant design. Rust is able to provide nice async syntax without having to commit to a particular runtime.

Note that even though it sounds like the language itself doesn’t provide much, its built-in async code generation is a challenging problem.

Implementing a Future

Here’s the declaration for the Future trait:

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

You can implement a Future by hand. For example, here’s one that produces an integer:

use std::future::Future;
use std::task::{Context, Poll};

struct IntFuture {
    x: i32,
}

impl Future for IntFuture {
    type Output = i32;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<i32> {
        Poll::Ready(self.x)
    }
}

Or you could generate the same thing using the async keyword:

async fn int_future(x: i32) -> i32 {
    x
}

In both of the above cases, we end up with a type that satisfies the trait Future<Output = i32>:

fn eat<F: Future<Output = i32>>(_: F) {}

fn main() {
    eat(IntFuture { x: 42 });
    eat(int_future(42));
}

Nested futures

If you have a chain of async functions, for example an HTTP request async function that calls some TCP I/O async functions, it will be compiled into a single encapsulating Future. Polling this future will cause the inner ones to get polled. Whatever is doing the polling has no awareness of any inner futures.

For example:

async fn get_audience() -> &'static str {
    "world"
}

async fn make_greeting() -> String {
    let audience = get_audience().await;

    format!("hello {}", audience)
}

In the above code, if make_greeting() is called to get a future, polling that future will in-turn poll the future generated by get_audience(), but this could be considered an implementation detail of make_greeting().

All this to say: when it comes to executing futures, we really only need to account for the top-level futures.

Calling poll

Making a Future is somewhat straightforward. Polling a Future, not so much. Let’s look at the signature for poll() again:

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;

We need a Pin and a Context. What the heck are those, you might ask?

Pin

Pin is a way of indicating that some memory won’t be moved. Normally, structs can be moved from one memory location to another without any issue. This is because Rust forbids self-references in safe code. For example, a struct can exist on the stack, then be moved to a Box on the heap, and Rust can perform the move by simply copying the bytes a-la-memcpy. However, one of the great achievements of Rust async is borrowing across await points, and this requires bending the rules a little bit. An async-generated Future needs to have the ability to keep references to its own internal memory when await is used, and thus it needs assurance that its memory won’t be moved around between poll() calls.

The way Pin is used here is a little unintuitive. The poll() function consumes the Pin. This means in order to poll a future, you need to instantiate a new Pin wrapper every time you poll. Here’s how that looks:

let mut f = IntFuture { x: 42 };

let mut cx = ... // we'll talk about this later

let p = unsafe { Pin::new_unchecked(&mut f) };

match p.poll(&mut cx) {
    Poll::Ready(x) => println!("got int: {}", x),
    Poll::Pending => println!("future not ready"),
}

(side note: it’s interesting that Pin can be used as the self type. It seems Rust doesn’t limit self to just T, &T, and &mut T, but allows some other types from a fixed list).

Are you off the hook about keeping the memory pinned once the Pin has been consumed and dropped by the poll() method? Nope! Quoth the docs, “A value, once pinned, must remain pinned forever”. In fact, this is why constructing the Pin is unsafe. The unsafe part is you’re eventually going to lose the Pin, but you’ll still need to uphold the pinning contract despite not having the Pin to protect you anymore.

Context and Waker

Currently, the only thing Context does is provide access to a Waker. The Waker is used to indicate when the future should be polled again, if poll() returns Poll::Pending. The reason poll() takes a Context rather than simply a Waker is to enable expansion. Other things may be added to Context in later Rust versions.

Constructing a Context takes some effort. Its sole constructor, Context::from_waker, needs a Waker. The sole constructor for Waker, Waker::from_raw, needs a RawWaker. And the sole constructor for RawWaker, RawWaker::new, needs a RawWakerVTable. Still with me?

Let’s implement a minimal RawWakerVTable of no-ops:

use std::task::{RawWaker, RawWakerVTable};

unsafe fn vt_clone(data: *const ()) -> RawWaker {
    RawWaker::new(data, &VTABLE)
}

unsafe fn vt_wake(_data: *const ()) {
}

unsafe fn vt_wake_by_ref(_data: *const ()) {
}

unsafe fn vt_drop(_data: *const ()) {
}

static VTABLE: RawWakerVTable = RawWakerVTable::new(
    vt_clone,
    vt_wake,
    vt_wake_by_ref,
    vt_drop
);

Then we can construct a Waker like this:

let rw = RawWaker::new(&(), &VTABLE);

let w = unsafe { Waker::from_raw(rw) };

All this vtable stuff is to allow us to provide our own waking behavior. RawWaker is just a data pointer and a vtable. Waker wraps that and implements familiar Rust traits like Clone and Drop. The Waker constructor is unsafe since the vtable functions may need to dereference raw pointers.

You may wonder why Rust uses this custom vtable instead of just making Waker a trait. I believe it’s done this way so Waker can be ownable while also avoiding a heap allocation. Using a trait probably would have required Box somewhere.

Finally, we can construct a Context:

let mut cx = Context::from_waker(&w);

Whew!

Of course, in a real application we’ll need our Waker to actually do something. We’ll discuss that later.

Calling poll, for real this time

Now that we know how to construct a Pin and a Context, we can call poll(). Here’s the entire source of a program that polls a future:

use std::future::Future;
use std::pin::Pin;
use std::task::{Poll, Context, Waker, RawWaker, RawWakerVTable};

unsafe fn vt_clone(data: *const ()) -> RawWaker {
    RawWaker::new(data, &VTABLE)
}

unsafe fn vt_wake(_data: *const ()) {
}

unsafe fn vt_wake_by_ref(_data: *const ()) {
}

unsafe fn vt_drop(_data: *const ()) {
}

static VTABLE: RawWakerVTable = RawWakerVTable::new(
    vt_clone,
    vt_wake,
    vt_wake_by_ref,
    vt_drop
);

async fn get_greeting() -> &'static str {
    "hello world"
}

fn main() {
    let mut f = get_greeting();

    let rw = RawWaker::new(&(), &VTABLE);
    let w = unsafe { Waker::from_raw(rw) };
    let mut cx = Context::from_waker(&w);

    let p = unsafe { Pin::new_unchecked(&mut f) };
    assert_eq!(p.poll(&mut cx), Poll::Ready("hello world"));
}

Triggering the Waker

Let’s make a future that knows how to wake itself.

Below is an implementation for a timer. It can be constructed with a desired duration. The first time it is polled, it spawns a thread and returns Poll::Pending. The next time it is polled, it returns Poll::Ready. The thread sleeps and then calls wake().

use std::time;
use std::thread;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct TimerFuture {
    duration: time::Duration,
    handle: Option<thread::JoinHandle<()>>,
}

impl TimerFuture {
    fn new(duration: time::Duration) -> Self {
        Self {
            duration,
            handle: None,
        }
    }
}

impl Future for TimerFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        match &self.handle {
            None => {
                let duration = self.duration;
                let waker = cx.waker().clone();
                self.handle = Some(thread::spawn(move || {
                    thread::sleep(duration);
                    waker.wake();
                }));
                Poll::Pending
            },
            Some(_) => {
                let handle = self.handle.take().unwrap();
                handle.join().unwrap();
                Poll::Ready(())
            },
        }
    }
}

// convenience wrapper for use in async functions
fn sleep(duration: time::Duration) -> TimerFuture {
    TimerFuture::new(duration)
}

The Waker is cloned so we can keep it around after poll() returns. In fact, we move it to the other thread.

Note that in a real application you would not want to spawn a thread for every timer. Instead, timers would probably be registered with some evented reactor. For this example though, we’ll keep it simple.

Managing different kinds of futures

Before we get to the executor, we need to address one last challenge: varying Future types.

Different futures can have different Output types (e.g. Future<Output = i32> and Future<Output = String>), and thus different poll() return values. This means if we build an executor, we can’t simply throw the futures into something like Vec<Box<dyn Future>>, and even if that were possible we wouldn’t be able to process them using the same code.

The solution to this, as far as I can tell, is to pick a common return type for all of the futures tracked by the executor (i.e. the top-level futures). For example, you could decide that all top-level futures should have no return type, so you can contain them in a Vec<Box<dyn Future<Output = ()>>>. Note that nested futures can still have arbitrary return types. An async function with no return value can await a future that returns String. This works because all the nested futures are hidden within the outer future, and the executor only needs to care about the outer future.

Our typing problems don’t end there though. The poll() function requires a pinned reference to its concrete type. Recall the capitalized Self in its signature:

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;

This means even if two different future implementations have the same Output type and thus conform to the same trait, we still can’t process them using non-generic code! What the what?

Probably the concrete type is needed so Pin can protect an area of known size. In any case, dyn Future is sort of useless.

A way around this is to hide the details in closures. We can use monomorphization to generate different code for each concrete future implementation, but have the closures share the same function signature. Below, we create closures that conform to the trait dyn FnMut(&mut Context) -> Poll<()> (for futures with Output = ()) and box them:

type PollFn = dyn FnMut(&mut Context) -> Poll<()>;

struct WrappedFuture {
    poll_fn: Box<PollFn>,
}

impl WrappedFuture {
    pub fn new<F>(mut f: F) -> Self
    where
        F: Future<Output = ()> + 'static
    {
        let c = move |cx: &mut Context| {
            let p: Pin<&mut F> = unsafe { Pin::new_unchecked(&mut f) };
            match p.poll(cx) {
                Poll::Ready(_) => Poll::Ready(()),
                Poll::Pending => Poll::Pending,
            }
        };

        Self {
            poll_fn: Box::new(c),
        }
    }

    pub fn poll(&mut self, cx: &mut Context) -> Poll<()> {
        (self.poll_fn)(cx)
    }
}

With WrappedFuture, we can treat all of our futures the same:

// generates Future<Output = ()>
async fn print_hello() {
    println!("hello");
}

// generates Future<Output = ()>
async fn print_goodbye() {
    println!("goodbye");
}

fn main() {
    let mut futures: Vec<WrappedFuture> = Vec::new();

    futures.push(WrappedFuture::new(print_hello()));
    futures.push(WrappedFuture::new(print_goodbye()));

    for f in futures.iter_mut() {
        let mut cx = ... // context
        assert_eq!(f.poll(&mut cx), Poll::Ready(()));
    }
}

A simple executor

To execute our futures, we need to do three things:

  • Keep track of the futures somewhere.
  • Poll the futures when they are created.
  • Implement Waker so we know when to poll them again.

Below is a basic executor. It has two vectors, need_poll and sleeping, to keep track of the futures. Calling spawn adds a future to need_poll.

Instead of working with WrappedFuture directly, we wrap it with Arc/Mutex so the futures can be shared across threads. We declare an alias, SharedFuture, to cut down on the noise.

type SharedFuture = Arc<Mutex<WrappedFuture>>;

struct ExecutorData {
    need_poll: Vec<SharedFuture>,
    sleeping: Vec<SharedFuture>,
}

struct Executor {
    data: Arc<(Mutex<ExecutorData>, Condvar)>,
}

impl Executor {
    pub fn new() -> Self {
        let data = ExecutorData {
            need_poll: Vec::new(),
            sleeping: Vec::new(),
        };

        Self {
            data: Arc::new((Mutex::new(data), Condvar::new())),
        }
    }

    pub fn spawn<F>(&self, f: F)
    where
        F: Future<Output = ()> + 'static
    {
        let (lock, _) = &*self.data;

        let mut data = lock.lock().unwrap();

        data.need_poll.push(Arc::new(Mutex::new(WrappedFuture::new(f))));
    }

    pub fn wake(
        data: &mut Arc<(Mutex<ExecutorData>, Condvar)>,
        wf: &SharedFuture
    ) {
        let (lock, cond) = &**data;

        let mut data = lock.lock().unwrap();

        let mut pos = None;
        for (i, f) in data.sleeping.iter().enumerate() {
            if Arc::ptr_eq(f, wf) {
                pos = Some(i);
                break;
            }
        }
        if pos.is_none() {
            // unknown future
            return
        }

        let pos = pos.unwrap();

        let f = data.sleeping.remove(pos);
        data.need_poll.push(f);

        cond.notify_one();
    }

    pub fn exec(&self) {
        loop {
            let (lock, cond) = &*self.data;

            let mut data = lock.lock().unwrap();

            if data.need_poll.is_empty() {
                if data.sleeping.is_empty() {
                    // no tasks, we're done
                    break;
                }

                data = cond.wait(data).unwrap();
            }

            let need_poll = mem::replace(
                &mut data.need_poll,
                Vec::new()
            );

            mem::drop(data);

            let mut need_sleep = Vec::new();

            for f in need_poll {
                let w = MyWaker {
                    data: Arc::clone(&self.data),
                    f: Arc::new(Mutex::new(Some(Arc::clone(&f)))),
                }.into_task_waker();

                let mut cx = Context::from_waker(&w);

                let result = {
                    f.lock().unwrap().poll(&mut cx)
                };
                match result {
                    Poll::Ready(_) => {},
                    Poll::Pending => {
                        need_sleep.push(f);
                    },
                }
            }

            let mut data = lock.lock().unwrap();

            data.sleeping.append(&mut need_sleep);
        }
    }
}

The exec function loops and polls futures. First, it checks if there are futures that need polling. If there aren’t any, it will wait for a sleeping one to awaken. Once there are futures to poll, they are polled. If poll returns Ready, then the future has completed and we can simply let it go. If poll returns Pending, then we move the future over into the sleeping vector. If there are no futures left, the loop exits.

In order to wake the executor, someone needs to call Executor::wake. It’s an associated function intended to be called from another thread by MyWaker.

Here’s the code for MyWaker:

#[derive(Clone)]
struct MyWaker {
    data: Arc<(Mutex<ExecutorData>, Condvar)>,
    f: Arc<Mutex<Option<SharedFuture>>>,
}

impl MyWaker {
    ...

    fn wake(mut self) {
        self.wake_by_ref();
    }

    fn wake_by_ref(&mut self) {
        let f: &mut Option<SharedFuture> = &mut self.f.lock().unwrap();
        if f.is_some() {
            let f: SharedFuture = f.take().unwrap();
            Executor::wake(&mut self.data, &f);
        }
    }
}

Waker implementations are intended for one-time use but must also be clonable. This is why the inner SharedFuture is wrapped by Option as well as another Arc/Mutex. The set of wakers for a particular future have safe shared access to a single Option<SharedFuture>. When wake() is called on any one of the wakers in the set, the future is awakened and the option is set to None.

In order for our waker implementation to be usable, we need to integrate it into the vtable so that it can be controlled by Waker:

impl MyWaker {
    ...

    fn into_task_waker(self) -> Waker {
        let w = Box::new(self);
        let rw = RawWaker::new(Box::into_raw(w) as *mut (), &VTABLE);
        unsafe { Waker::from_raw(rw) }
    }

    ...
}

unsafe fn vt_clone(data: *const ()) -> RawWaker {
    let w = (data as *const MyWaker).as_ref().unwrap();
    let new_w = Box::new(w.clone());

    RawWaker::new(Box::into_raw(new_w) as *mut (), &VTABLE)
}

unsafe fn vt_wake(data: *const ()) {
    let w = Box::from_raw(data as *mut MyWaker);
    w.wake();
}

unsafe fn vt_wake_by_ref(data: *const ()) {
    let w = (data as *mut MyWaker).as_mut().unwrap();
    w.wake_by_ref();
}

unsafe fn vt_drop(data: *const ()) {
    Box::from_raw(data as *mut MyWaker);
}

Basically the above unsafe code wires up the vtable functions to MyWaker’s normal Rust methods, handling cloning and dropping.

Obviously this is not the most sophisticated executor, but it should work enough as an example.

Using the executor

Let’s give it a spin! The sleep function is a wrapper around TimerFuture which we declared earlier.

fn main() {
    let e = Executor::new();

    e.spawn(async {
        println!("a");
        sleep(time::Duration::from_millis(200)).await;
        println!("c");
    });

    e.spawn(async {
        sleep(time::Duration::from_millis(100)).await;
        println!("b");
        sleep(time::Duration::from_millis(200)).await;
        println!("d");
    });

    e.exec();
}

The output, as expected:

a
b
c
d

Full source here.