Rust no-stdのasync完全理解を目指そう!

はじめに

この記事はRust Advent Calendar 2019の17日目として書きました。

組込みRust界の神japaricさんがno-std環境でasyncを使うPoCレポジトリを公開しています。

github.com

理解できるかどうか非常に自信がありませんが、これは見てみるしかありません!

後日正式な記事が書かれるそうなので、それを待ったほうが得策かもしれません!

引用の領域超えている気がしますので、一応ライセンス表記します。 今回解説するレポジトリは、MIT license、もしくは、Apache License, Version 2.0、でライセンスされています。

目次

自分なりのまとめ

  • 組込みのno-std環境で使えるasync-awaitのproof of conceptを紹介するよ (nightlyは必要だけどね!)
  • cooperativeスケジューラ (executor) は割り込みハンドラから完全に隔離するよ
  • なので割り込みハンドラで実行されるリアルタイム性が要求されるコードの予測性を損なわずに、cooperativeなコードを実行できるよ
  • そのために、executor#[global_allocator]とは違う専用のメモリアロケータを使うよ
  • 現在Rustのcollectionはglobal_allocatorを使うようにハードコーディングされているので、collectionを書き直す必要があるよ

つまるところ、async-awaitを使うと、リアルタイムタスクの最悪実行時間が計算しずらくなるため、割り込みコンテキストと完全に分離できるように、async-awaitno-std環境で使えるようにした、という話のようです。 現在のところ、no-std環境はCortex-Mに限定されています。

実装も覗いてみましたが、手続きマクロ以外は割と読めそうな感じです。

ツール

nightlyツールチェインと、Cortex-M3向けのクロスビルド環境が必要です。

$ rustup override set nightly
$ rustup update
$ rustup target add thumbv7-none-eabi

後、実行環境として、qemu-system-armを使います。

$ cargo run --example async-await --features="nightly"

README

まずなにはともあれ、プロジェクトのREADMEを見てみましょう。 と思ったらREADMEだけで1100行あるではありませんか! これは前途多難な予感です…。

Goal

Real Time For the Masses (RTFM) と一緒に使えるリアルタイムアプリケーション向けのcooperativeスケジューラを作ることが目標のようです。 cooperativeスケジューラは最悪実行時間の解析が難しくなります。

Background

Asynchronous code

皆さんご存知のようにRust 1.39 からasync / await の機能が安定化しました。

非同期コードはexecutorによって実行されることを意味します。 executorは標準ライブラリでは提供されていませんが、async-stdtokioといったマルチスレッドexecutor crateがあります。

async fnインスタンスtaskになり、executorがスケジューリング、実行します。

// toolchain: 1.39.0
// async-std = "1.2.0"

use async_std::task;

fn main() {
    // schedule one instance of task `foo` -- nothing is printed at this point
    task::spawn(foo());

    println!("start");
    // start task `bar` and drive it to completion
    // this puts the executor to work
    // it's implementation defined whether `foo` or `bar` runs first or
    // whether `foo` gets to run at all
    task::block_on(bar());
}

async fn foo() {
    println!("foo");
}

async fn bar() {
    println!("bar");
}
$ cargo run
start
foo
bar

サンプルコード内のコメントによると、タスクの実行順序はexecutorの実装依存なのですね。 勉強になります!

executorはタスクを協調的に実行します。 最も単純な場合、.awaitに到達するまでタスクを実行し、実行をブロックする必要がある場合はそのタスクをサスペンドし、別のタスクをresumeします。

Some implementation details

シンタックス的には、ジェネレータは、サスペンションポイント (yield) を含む、クロージャのような (|| { .. }) ものです。

// toolchain: nightly-2019-12-02

use core::{pin::Pin, ops::Generator};

fn main() {
    let mut g = || {
        println!("A");
        yield;
        println!("B");
        yield;
        println!("C");
    };

    let mut state = Pin::new(&mut g).resume();
    println!("{:?}", state);
    state = Pin::new(&mut g).resume();
    println!("{:?}", state);
    state = Pin::new(&mut g).resume();
    println!("{:?}", state);
}

セマンティクス的には、ジェネレータはyield間の状態マシンで、外部からresumeされることで状態遷移します。 ふむふむ。

executorは状態マシンのリストを持っていて、全ての状態マシンが完了になるまで、resumeし続けます。 各状態マシンは異なるサイズで違うコードを実行するので、トレイトオブジェクト (Box<dyn Generator>) としてリストされます。

これはサンプルコードを見ると理解しやすいです。

fn executor(mut tasks: Vec<Pin<Box<dyn Generator<Yield = (), Return = ()>>>>) {
    let mut n = tasks.len();
    while n != 0 {
        for i in (0..n).rev() {
            let state = tasks[i].as_mut().resume();
            if let GeneratorState::Complete(()) = state {
                tasks.swap_remove(i); // done; remove
            }
        }

        n = tasks.len();
    }
}

なるほど。タスク (状態マシン) をVec<Box<dyn Generator>>として受け取り、各タスクのresumeを呼ぶ。 状態が完了になると、Vecからswap_removeする、と。

Idea

アプローチとしては、非同期コードを#[idle]もしくはfn mainに隔離します。 その理由は

  • 協調的タスクには終了しないものと、短期間で終了するもがあるため、終了しない#[idle]タスクでexecutorを動かすのが賢明である
  • executorで必要となる動的メモリ確保は、#[idle]に制限されます。#[idle]内ではリアルタイムでないアロケータを使用し、通常のタスクは動的メモリ確保をしないようにします。アロケータを#[idle]内で排他的に使用することで、mutexなどの排他制御が不要になります。

ふむ、よくわからないので、もう少し先を見てみましょう。

Implementation

実装には2つのコンポーネントがあります。「スレッドモード」アロケータと「スレッドモード」executorです。 「スレッドモード」はARMの「スレッドモード」を意味しています。

アロケータとexecutorは、「スレッドモード」でのみ利用できます。 ARMの「ハンドラモード」ではアロケータとexecutorにアクセスできません。

「ハンドラモード」は主に割り込みや例外を処理するためのモードです。

Cortex-Mで、リセットハンドラは「スレッドモード」で実行されます。

RTFMアプリでは、#[init]#[idle]は「スレッドモード」で実行します。

TM (Thread-Mode) allocator

TMアロケータはseparate allocatorです。ん?どういうことでしょう? #[global_allocator]で定義されているものとは、独立アロケータです。ああ、そういうこと。

理想的には、RFC #1398で提案されているallocator-genericコレクションをAllocトレイトを通じて使えると良いのですが、Allocトレイトは安定化していませんし、allocator-genericコレクションは存在していません。

あー、なるほど。C++のようにカスタムアロケータが設定できるコレクションが提案されているのですね?(要確認) 今のコレクションは、#[global_allocator]を使うようにハードコーディングされています。

TMアロケータはstableで実装できますが、コレクションはそうではないようです。 Rcで使用するcore::intrinsics::abortがunstableであるなどの理由で。 stableではコレクションの型強制もできないようです。Box<impl Generator>は使えないため、Box<dyn Generator>を使います。

// toolchain: 1.39.0

use cortex_m_tm_alloc::allocator;
use tlsf::Tlsf;

#[allocator(lazy)]
static mut A: Tlsf = {
    // `MEMORY` is transformed into `&'static mut [u8; 64]`
    static mut MEMORY: [u8; 64] = [0; 64];

    let mut tlsf = Tlsf::new();
    tlsf.extend(MEMORY);
    tlsf
};

TMアロケータをAという名前で定義します。Aは実行時に[TLSF]アロケータを初期化します。

Aアロケータのハンドラはgetコンストラクタで取得します。 getコンストラクタは、Option<A>を返します。「スレッドモード」で呼び出すとSomeヴァリアントが、「ハンドラモード」で呼び出すとNoneが帰ります。 Aは、CopyAllocトレイトを実装したサイズ0の型です。SendSyncトレイトは実装していないため、インスタンスを割り込み / 例外ハンドラに渡すことができません。

な、なるほど…。

#[entry]
fn main() -> ! {
    hprintln!("before A::get()").ok();
    SCB::set_pendsv();

    if let Some(a) = A::get() {
        hprintln!("after A::get()").ok();
        SCB::set_pendsv();

        // ..
    } else {
        // UNREACHABLE
    }

    // ..
}

#[exception]
fn PendSV() {
    hprintln!("PendSV({:?})", A::get()).ok();
}
$ cargo run
before A::get()
PendSV(None)
after A::get()
PendSV(None)

PendSVハンドラ内では、A::get()してもNoneになっています。 #[entry]内では、Someヴァリアントが得られていますね(2回めのPendSVに突入していることから)。

if let Some(a) = A::get() {
    // ..

    let mut xs: Vec<i32, A> = Vec::new(a);

    for i in 0.. {
        xs.push(i);
        hprintln!("{:?}", xs).ok();
    }
}

一度アロケータインスタンスを取得すれば、コレクションの初期化時にアロケータのコピーを渡すことで、アロケータを使用できます。 アロケータはサイズ0の型なので、スタックサイズは増えません。

if let Some(a) = A::get() {
    // ..

    let mut xs: Vec<i32, A> = Vec::new(a);

    for i in 0.. {
        xs.push(i);
        hprintln!("{:?}", xs).ok();
    }
}

グローバルアロケータと同様に、TMアロケータもOut Of Memoryになる可能性があります。 その場合、#[oom]アトリビュートを使って定義されたOut Of Memoryハンドラが呼ばれます。

#[alloc_oom::oom]
fn oom(layout: Layout) -> ! {
    hprintln!("oom({:?})", layout).ok();
    debug::exit(debug::EXIT_FAILURE);
    loop {}
}
$ cargo run
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
oom(Layout { size_: 32, align_: 4 })

$ echo $?
1

TM (Thread-Mode) executor

TM executorはTMアロケータに依存しています。

// toolchain: nightly-2019-12-02

use cortex_m_tm_alloc::allocator;
use cortex_m_tm_executor::executor;

#[allocator(lazy)]
static mut A: Tlsf = { /* .. */ };

executor!(name = X, allocator = A);

TMアロケータ同様に、TM executorも「スレッドモード」のみでハンドラを取得できます。 getコンストラクタはTM executorとTMアロケータを返します。 TM executorもCopyトレイトを実装しますが、SendSyncは実装しません。

executorはタスクをspawnするのに使います。 spawnは、具体的なジェネレータを受け取り、Box化し、内部キューに格納します。 spawn自体は、ジェネレータ / タスクコードを実行しません! タスクを実行するにはblock_on APIを使います。

ふむ?とりあえずサンプルコードを見てみますか。

|> examples/tasks.rs

#[entry]
fn main() -> ! {
    if let Some((x, _a)) = X::get() {
        x.spawn(move || {
            hprintln!(" A0").ok();
            yield;

            hprintln!(" A1").ok();
            // but of course you can `spawn` a task from a spawned task
            x.spawn(|| {
                hprintln!("  C0").ok();
                yield;

                hprintln!("  C1").ok();
            });
            yield;

            hprintln!(" A2").ok();
            // NOTE return value will be discarded
            42
        });

        let ans = x.block_on(|| {
            hprintln!("B0").ok();
            yield;

            hprintln!("B1").ok();
            yield;

            hprintln!("B2").ok();
            yield;

            hprintln!("B3").ok();

            42
        });

        hprintln!("the answer is {}", ans).ok();
    }

    debug::exit(debug::EXIT_SUCCESS);

    loop {}
}

上記コードを実行した結果は、次のようになります。

$ cargo run --example tasks --features="nightly"
B0
 A0
B1
 A1
B2
  C0
 A2
B3
the answer is 42

もう一度サンプルコードに戻ってみると、下のジェネレータをexecutor Xspawnした時点では、まだ非同期コードは実行されません。

#[entry]
fn main() -> ! {
    if let Some((x, _a)) = X::get() {
        x.spawn(move || {
            hprintln!(" A0").ok();
            yield;

            hprintln!(" A1").ok();
            // but of course you can `spawn` a task from a spawned task
            x.spawn(|| {
                hprintln!("  C0").ok();
                yield;

                hprintln!("  C1").ok();
            });
            yield;

            hprintln!(" A2").ok();
            // NOTE return value will be discarded
            42
        });

続く、block_onでジェネレータが与えられると、実行を開始します。 B0出力後、yieldすると、先ほどspawnしたタスクに制御が移り、A0が出力されます。

        let ans = x.block_on(|| {
            hprintln!("B0").ok();
            yield;

            hprintln!("B1").ok();
            yield;

            hprintln!("B2").ok();
            yield;

            hprintln!("B3").ok();

            42
        });

        hprintln!("the answer is {}", ans).ok();
    }

A1ではさらにタスクをspawnしていますが、その時点では実行されておらず、B2出力後のyieldで制御が移ってきます。 C0後にyieldすると、A2を出力するコードに制御が移っていますね。

block_onで実行したジェネレータからは、戻り値 (42) を受け取っています。

でもC1が実行されていませんね?

そう、block_onはspawnしたタスク全てが完了になることを保証しません。 単に、引数で渡されたジェネレータが完了になるまで、実行を進めるだけです。

block_onをネストするとデッドロックする可能性があるため、TM executorではネストしたblock_on呼び出しはパニックになるよう、実装されています。

#[r#async] / r#await!

block_onをネストできないとすると、ジェネレータをどうやったら完了状態にできるのでしょうか? r#await!マクロを使います。ジェネレータを返す関数を簡単に書くために#[r#async]アトリビュートもあります。

例として、割り込みハンドラから非同期にデータを受け取りたいとします。#[r#async]を使って次のように書くことができます。 まずは、ジェネレータを簡単に書くためのアトリビュートです。

use core::ops::Generator;

use heapless::{
    spsc::Consumer, // consumer endpoint of a single-producer single-consumer queue
    ArrayLength,
};
use gen_async_await::r#async;

#[r#async]
fn dequeue<T, N>(mut c: Consumer<'static, T, N>) -> (T, Consumer<'static, T, N>)
where
    N: ArrayLength<T>,
{
    loop {
        if let Some(x) = c.dequeue() {
            break (x, c);
        }
        yield
    }
}

// OR you could have written this; both are equivalent
fn dequeue2<T, N>(
    mut c: Consumer<'static, T, N>,
) -> impl Generator<Yield = (), Return = (T, Consumer<'static, T, N>)>
where
    N: ArrayLength<T>,
{
    || loop {
        if let Some(x) = c.dequeue() {
            break (x, c);
        }
        yield
    }
}

dequeueはジェネレータを返す関数で、ジェネレータでは割り込みハンドラのProducerからデータが送られてくるとSomeになるから、そこで値を返して、yieldしています。 んー?ジェネレータが複雑な型になったりすると有り難いのかな…?

アプリケーションは次のように書けます。

#[entry]
fn main() -> ! {
    static mut Q: Queue<i32, consts::U4> = Queue(i::Queue::new());

    let (p, mut c) = Q.split();

    // send the producer to an interrupt handler
    send(p);

    if let Some((x, _a)) = X::get() {
        // task that asynchronously processes items produced by
        // the interrupt handler
        x.spawn(move || loop {
            let ret = r#await!(dequeue(c)); // <- ★
            let item = ret.0;
            c = ret.1;
            // do stuff with `item`
        });

        x.block_on(|| {
            // .. do something else ..
        });
    }

    debug::exit(debug::EXIT_SUCCESS);

    loop {}
}

うーん、難しい。dequeue(c)はジェネレータを返しているので、x.spawnの引数もジェネレータになると。

std_async::sync::Mutex?

std_asyncでは、2つのタスク間でメモリを共有したい場合、MutexRwLockを使います。 task::spawn APIの引数はSendトレイトを実装したジェネレータです。 std_asyncのジェネレータはマルチスレッドで動作し、並行実行される可能性があるからです。

use async_std::{sync::Mutex, task};

fn main() {
    let shared: &'static Mutex<u128> = Box::leak(Box::new(Mutex::new(0u128)));

    task::spawn(async move {
        let x = shared.lock().await;
        println!("{}", x);
    });

    task::block_on(async move {
        *shared.lock().await += 1;
    });
}

TM executorは必ず同じコンテキストで動作し、タスクは1つずつ順番に実行されます。 そのためspawnの引数はSendを実装する必要がありません。 そのため、タスク間でデータを共有する際、Mutexの代わりに、単にRefCellCellを使うことができます。

あ、そうですね。

use core::cell::RefCell;

#[entry]
fn main() -> ! {
    static mut SHARED: RefCell<u64> = RefCell::new(0);

    if let Some((x, _a)) = X::get() {
        let shared: &'static _ = SHARED;

        x.spawn(move || loop {
            hprintln!("{}", shared.borrow()).ok();
            yield;
        });

        x.block_on(move || {
            *shared.borrow_mut() += 1;
            yield;
            *shared.borrow_mut() += 1;
            yield;
        });
    }

    debug::exit(debug::EXIT_SUCCESS);

    loop {}
}

うん、これはよくわかる。

実装を覗いてみよう

さて、ここまでがREADMEです (長い…) 。

r#await! / #[r#async]

r#await!マクロは、わりと読めます。

|> gen-async-await/src/lib.rs

/// `e.await` -> `r#await(e)`
// expansion is equivalent to the desugaring of `($g).await` -- see
// rust-lang/rust/src/librustc/hir/lowering/expr.rs (Rust 1.39)
// XXX Does `$g` need to satisfy the `: Unpin` bound? -- I think not because `$g` is drived to
// completion so any self-referential borrow will be over by the time this macro returns control
// back to the caller. This is unlike `futures::select!` which partially polls its input futures.
// Those input futures may be moved around and then passed to a different `select!` call; the move
// can invalidate self-referential borrows so the input future must satisfy `Unpin`
#[macro_export]
macro_rules! r#await {
    ($g:expr) => {
        match $g {
            mut pinned => {
                use core::ops::Generator;
                loop {
                    match unsafe { core::pin::Pin::new_unchecked(&mut pinned).resume() } {
                        core::ops::GeneratorState::Yielded(()) => {}
                        core::ops::GeneratorState::Complete(x) => break x,
                    }
                    yield ()
                }
            }
        }
    };
}

ジェネレータのステートマシンに沿った動作をしているようです。

#[r#async]アトリビュートですが、修行をサボっていたせいで未だに手続きマクロがいまいち読めません! が、関数の戻り値型をimpl Generator型にし、関数内の処理をジェネレータにしているっぽいです。

/// `async fn foo() { .. }` -> `#[r#async] fn foo() { .. }`
// NOTE the built-in `async fn` desugars to a generator and wraps it in a newtype that makes it
// `!Unpin`; this is required because `async fn`s allow self-referential borrows (i.e. `let x = ..;
// let y = &x; f().await; use(y)`). AFAICT, self referential borrows are not possible in generators
// (as of 1.39) so I think we don't need the newtype
#[proc_macro_attribute]
pub fn r#async(args: TokenStream, item: TokenStream) -> TokenStream {
    if !args.is_empty() {
        return parse::Error::new(Span::call_site(), "`#[async]` attribute takes no arguments")
            .to_compile_error()
            .into();
    }

    // snip

    let block = &item.block;
    quote!(
        #(#attrs)*
        #vis fn #ident #generics (
            #inputs
        ) -> impl core::ops::Generator<Yield = (), Return = #output> #(+ #lts)*
        #where_clause
        {
            move || #block
        }
    )
    .into()

collections

初期化時に任意のアロケータが指定できるコレクションです。 例えばVecなら次のような感じです。

|> collections/src/vec.rs

pub struct Vec<T, A>
where
    A: Alloc,
{
    allocator: A,
    cap: usize,
    len: usize,
    ptr: Unique<T>,
}

impl<A, T> Vec<T, A>
where
    A: Alloc,
{
    // `new`で`Alloc`トレイトを実装するAを引数として渡す
    pub fn new(allocator: A) -> Self {
        let cap = if mem::size_of::<T>() == 0 {
            usize::max_value()
        } else {
            0
        };

        Self {
            allocator,
            cap,
            len: 0,
            ptr: Unique::empty(),
        }
    }
    // snip

cortex-m-tm-alloc

Cortex-Mのスレッドモードでのみ使用できる「スレッドモードアロケータ」です。 get()を呼んだ際、スレッドモードならアロケータインスタンスを、そうでなければNoneが得られるのでした。

Cortex-Mを知っていれば、実装は素直です。SCBのICSRの値を読み込んで、スレッドモードならSomeを、そうでなければNoneを返しています。

|> cortex-m-tm-alloc/src/lib.rs

    pub unsafe fn get() -> Option<Self> {
        if cfg!(not(cortex_m)) {
            return None;
        }

        const SCB_ICSR: *const u32 = 0xE000_ED04 as *const u32;

        if SCB_ICSR.read_volatile() as u8 == 0 {
            // Thread mode (i.e. not within an interrupt or exception handler)
            Some(Private {
                _not_send_or_sync: PhantomData,
            })
        } else {
            None
        }
    }

allocatorアトリビュートの実装は歯が立たなかったです。出直します。

cortex-m-tm-executor

Executorの実装を見てみます。

アロケータとタスク配列、実行中かどうかを示すフラグをフィールドに持ちます。

|> cortex-m-tm-executor

pub struct Executor<A>
where
    A: Alloc + Copy,
{
    allocator: A,
    /// Spawned tasks
    tasks: UnsafeCell<Vec<Pin<Task<A>>, A>>,
    running: Cell<bool>,
}

まず、spawnです。 タスクをヒープ領域に作って、タスク配列に追加するだけです。 なので、spawn()を呼んだだけではタスクが実行されなかったわけですね。

impl<A> Executor<A>
where
    A: Alloc + Copy,
{
    // snip
    pub fn spawn<T>(&self, g: impl Generator<Yield = (), Return = T> + 'static) {
        // this alternative to `GenDrop` produces larger heap allocations
        // let g = || drop(r#await!(g));
        let task: Task<A> = Box::new(GenDrop { g }, self.allocator);
        unsafe {
            (*self.tasks.get()).push(task.into());
        }
    }
}

一方、block_onでは、ジェネレータが完了状態になるまで、タスク配列内のタスクを単純に順番に実行していることがわかります。

    pub fn block_on<T>(&self, g: impl Generator<Yield = (), Return = T>) -> T {
        self.running.set(true);

        pin_mut!(g);

        loop {
            // move forward the main task `g`
            if let GeneratorState::Complete(x) = g.as_mut().resume() {
                self.running.set(false);
                break x;
            }

            let n = unsafe { (*self.tasks.get()).len() };
            for i in (0..n).rev() {
                let s = {
                    let task: TaskMut =
                        unsafe { (*self.tasks.get()).get_unchecked_mut(i).as_mut() };
                    task.resume()
                };

                if let GeneratorState::Complete(()) = s {
                    // task completed -- release memory
                    let task = unsafe { (*self.tasks.get()).swap_remove(i) };
                    drop(task);
                }
            }
        }
    }

ジェネレータの実装について少し。 結果を破棄するGenDropジェネレータが実装されています。 resumeするとGeneratorStateが返ります。

GeneratorState::Completeになると、値をdrop()していることがわかります。

impl<G> Generator for GenDrop<G>
where
    G: Generator<Yield = ()>,
{
    type Yield = ();
    type Return = ();

    fn resume(self: Pin<&mut Self>) -> GeneratorState<(), ()> {
        match G::resume(self.g()) {
            GeneratorState::Yielded(()) => GeneratorState::Yielded(()),
            GeneratorState::Complete(x) => {
                drop(x);
                GeneratorState::Complete(())
            }
        }
    }
}

終わりに

完全に理解できなかったァ…。 とは言え、かなり理解は深まりました。

ただ、アプリケーションはともかく、実行エンジン作るのは、かなり大変そうですね…。