Rust no-stdのasync完全理解を目指そう!
はじめに
この記事はRust Advent Calendar 2019の17日目として書きました。
組込みRust界の神japaric
さんがno-std
環境でasync
を使うPoCレポジトリを公開しています。
理解できるかどうか非常に自信がありませんが、これは見てみるしかありません!
後日正式な記事が書かれるそうなので、それを待ったほうが得策かもしれません!
引用の領域超えている気がしますので、一応ライセンス表記します。 今回解説するレポジトリは、MIT license、もしくは、Apache License, Version 2.0、でライセンスされています。
目次
自分なりのまとめ
- 組込みの
no-std
環境で使えるasync-await
のproof of conceptを紹介するよ (nightlyは必要だけどね!) - cooperativeスケジューラ (executor) は割り込みハンドラから完全に隔離するよ
- なので割り込みハンドラで実行されるリアルタイム性が要求されるコードの予測性を損なわずに、cooperativeなコードを実行できるよ
- そのために、
executor
は#[global_allocator]
とは違う専用のメモリアロケータを使うよ - 現在Rustのcollectionは
global_allocator
を使うようにハードコーディングされているので、collectionを書き直す必要があるよ
つまるところ、async-await
を使うと、リアルタイムタスクの最悪実行時間が計算しずらくなるため、割り込みコンテキストと完全に分離できるように、async-await
をno-std
環境で使えるようにした、という話のようです。
現在のところ、no-std
環境はCortex-M
に限定されています。
実装も覗いてみましたが、手続きマクロ以外は割と読めそうな感じです。
ツール
nightlyツールチェインと、Cortex-M3向けのクロスビルド環境が必要です。
$ rustup override set nightly $ rustup update $ rustup target add thumbv7-none-eabi
後、実行環境として、qemu-system-armを使います。
$ cargo run --example async-await --features="nightly"
README
まずなにはともあれ、プロジェクトのREADMEを見てみましょう。 と思ったらREADMEだけで1100行あるではありませんか! これは前途多難な予感です…。
Goal
Real Time For the Masses (RTFM) と一緒に使えるリアルタイムアプリケーション向けのcooperativeスケジューラを作ることが目標のようです。 cooperativeスケジューラは最悪実行時間の解析が難しくなります。
Background
Asynchronous code
皆さんご存知のようにRust 1.39 からasync
/ await
の機能が安定化しました。
非同期コードはexecutor
によって実行されることを意味します。
executor
は標準ライブラリでは提供されていませんが、async-std
やtokio
といったマルチスレッドexecutor crateがあります。
async fn
のインスタンスはtask
になり、executorがスケジューリング、実行します。
// toolchain: 1.39.0 // async-std = "1.2.0" use async_std::task; fn main() { // schedule one instance of task `foo` -- nothing is printed at this point task::spawn(foo()); println!("start"); // start task `bar` and drive it to completion // this puts the executor to work // it's implementation defined whether `foo` or `bar` runs first or // whether `foo` gets to run at all task::block_on(bar()); } async fn foo() { println!("foo"); } async fn bar() { println!("bar"); }
$ cargo run start foo bar
サンプルコード内のコメントによると、タスクの実行順序はexecutorの実装依存なのですね。 勉強になります!
executorはタスクを協調的に実行します。
最も単純な場合、.await
に到達するまでタスクを実行し、実行をブロックする必要がある場合はそのタスクをサスペンドし、別のタスクをresume
します。
Some implementation details
シンタックス的には、ジェネレータは、サスペンションポイント (yield
) を含む、クロージャのような (|| { .. }
) ものです。
// toolchain: nightly-2019-12-02 use core::{pin::Pin, ops::Generator}; fn main() { let mut g = || { println!("A"); yield; println!("B"); yield; println!("C"); }; let mut state = Pin::new(&mut g).resume(); println!("{:?}", state); state = Pin::new(&mut g).resume(); println!("{:?}", state); state = Pin::new(&mut g).resume(); println!("{:?}", state); }
セマンティクス的には、ジェネレータはyield
間の状態マシンで、外部からresume
されることで状態遷移します。
ふむふむ。
executor
は状態マシンのリストを持っていて、全ての状態マシンが完了になるまで、resume
し続けます。
各状態マシンは異なるサイズで違うコードを実行するので、トレイトオブジェクト (Box<dyn Generator>
) としてリストされます。
これはサンプルコードを見ると理解しやすいです。
fn executor(mut tasks: Vec<Pin<Box<dyn Generator<Yield = (), Return = ()>>>>) { let mut n = tasks.len(); while n != 0 { for i in (0..n).rev() { let state = tasks[i].as_mut().resume(); if let GeneratorState::Complete(()) = state { tasks.swap_remove(i); // done; remove } } n = tasks.len(); } }
なるほど。タスク (状態マシン) をVec<Box<dyn Generator>>
として受け取り、各タスクのresume
を呼ぶ。
状態が完了
になると、Vec
からswap_remove
する、と。
Idea
アプローチとしては、非同期コードを#[idle]
もしくはfn main
に隔離します。
その理由は
- 協調的タスクには終了しないものと、短期間で終了するもがあるため、終了しない
#[idle]
タスクでexecutor
を動かすのが賢明である executor
で必要となる動的メモリ確保は、#[idle]
に制限されます。#[idle]
内ではリアルタイムでないアロケータを使用し、通常のタスクは動的メモリ確保をしないようにします。アロケータを#[idle]
内で排他的に使用することで、mutex
などの排他制御が不要になります。
ふむ、よくわからないので、もう少し先を見てみましょう。
Implementation
実装には2つのコンポーネントがあります。「スレッドモード」アロケータと「スレッドモード」executorです。 「スレッドモード」はARMの「スレッドモード」を意味しています。
アロケータとexecutorは、「スレッドモード」でのみ利用できます。 ARMの「ハンドラモード」ではアロケータとexecutorにアクセスできません。
「ハンドラモード」は主に割り込みや例外を処理するためのモードです。
Cortex-Mで、リセットハンドラは「スレッドモード」で実行されます。
RTFM
アプリでは、#[init]
と#[idle]
は「スレッドモード」で実行します。
TM (Thread-Mode) allocator
TMアロケータはseparate allocator
です。ん?どういうことでしょう?
#[global_allocator]
で定義されているものとは、独立アロケータです。ああ、そういうこと。
理想的には、RFC #1398で提案されているallocator-generic
コレクションをAlloc
トレイトを通じて使えると良いのですが、Alloc
トレイトは安定化していませんし、allocator-generic
コレクションは存在していません。
あー、なるほど。C++のようにカスタムアロケータが設定できるコレクションが提案されているのですね?(要確認)
今のコレクションは、#[global_allocator]
を使うようにハードコーディングされています。
TMアロケータはstableで実装できますが、コレクションはそうではないようです。 Rcで使用するcore::intrinsics::abort
がunstableであるなどの理由で。
stableではコレクションの型強制もできないようです。Box<impl Generator>
は使えないため、Box<dyn Generator>
を使います。
// toolchain: 1.39.0 use cortex_m_tm_alloc::allocator; use tlsf::Tlsf; #[allocator(lazy)] static mut A: Tlsf = { // `MEMORY` is transformed into `&'static mut [u8; 64]` static mut MEMORY: [u8; 64] = [0; 64]; let mut tlsf = Tlsf::new(); tlsf.extend(MEMORY); tlsf };
TMアロケータをA
という名前で定義します。A
は実行時に[TLSF]アロケータを初期化します。
A
アロケータのハンドラはget
コンストラクタで取得します。
get
コンストラクタは、Option<A>
を返します。「スレッドモード」で呼び出すとSome
ヴァリアントが、「ハンドラモード」で呼び出すとNone
が帰ります。
A
は、Copy
とAlloc
トレイトを実装したサイズ0の型です。Send
とSync
トレイトは実装していないため、インスタンスを割り込み / 例外ハンドラに渡すことができません。
な、なるほど…。
#[entry] fn main() -> ! { hprintln!("before A::get()").ok(); SCB::set_pendsv(); if let Some(a) = A::get() { hprintln!("after A::get()").ok(); SCB::set_pendsv(); // .. } else { // UNREACHABLE } // .. } #[exception] fn PendSV() { hprintln!("PendSV({:?})", A::get()).ok(); }
$ cargo run before A::get() PendSV(None) after A::get() PendSV(None)
PendSV
ハンドラ内では、A::get()
してもNone
になっています。
#[entry]
内では、Some
ヴァリアントが得られていますね(2回めのPendSVに突入していることから)。
if let Some(a) = A::get() { // .. let mut xs: Vec<i32, A> = Vec::new(a); for i in 0.. { xs.push(i); hprintln!("{:?}", xs).ok(); } }
一度アロケータインスタンスを取得すれば、コレクションの初期化時にアロケータのコピーを渡すことで、アロケータを使用できます。 アロケータはサイズ0の型なので、スタックサイズは増えません。
if let Some(a) = A::get() { // .. let mut xs: Vec<i32, A> = Vec::new(a); for i in 0.. { xs.push(i); hprintln!("{:?}", xs).ok(); } }
グローバルアロケータと同様に、TMアロケータもOut Of Memoryになる可能性があります。
その場合、#[oom]
アトリビュートを使って定義されたOut Of Memoryハンドラが呼ばれます。
#[alloc_oom::oom] fn oom(layout: Layout) -> ! { hprintln!("oom({:?})", layout).ok(); debug::exit(debug::EXIT_FAILURE); loop {} }
$ cargo run [0] [0, 1] [0, 1, 2] [0, 1, 2, 3] oom(Layout { size_: 32, align_: 4 }) $ echo $? 1
TM (Thread-Mode) executor
TM executorはTMアロケータに依存しています。
// toolchain: nightly-2019-12-02 use cortex_m_tm_alloc::allocator; use cortex_m_tm_executor::executor; #[allocator(lazy)] static mut A: Tlsf = { /* .. */ }; executor!(name = X, allocator = A);
TMアロケータ同様に、TM executorも「スレッドモード」のみでハンドラを取得できます。
get
コンストラクタはTM executorとTMアロケータを返します。
TM executorもCopy
トレイトを実装しますが、Send
やSync
は実装しません。
executorはタスクをspawn
するのに使います。
spawn
は、具体的なジェネレータを受け取り、Box
化し、内部キューに格納します。
spawn
自体は、ジェネレータ / タスクコードを実行しません!
タスクを実行するにはblock_on
APIを使います。
ふむ?とりあえずサンプルコードを見てみますか。
|> examples/tasks.rs
#[entry] fn main() -> ! { if let Some((x, _a)) = X::get() { x.spawn(move || { hprintln!(" A0").ok(); yield; hprintln!(" A1").ok(); // but of course you can `spawn` a task from a spawned task x.spawn(|| { hprintln!(" C0").ok(); yield; hprintln!(" C1").ok(); }); yield; hprintln!(" A2").ok(); // NOTE return value will be discarded 42 }); let ans = x.block_on(|| { hprintln!("B0").ok(); yield; hprintln!("B1").ok(); yield; hprintln!("B2").ok(); yield; hprintln!("B3").ok(); 42 }); hprintln!("the answer is {}", ans).ok(); } debug::exit(debug::EXIT_SUCCESS); loop {} }
上記コードを実行した結果は、次のようになります。
$ cargo run --example tasks --features="nightly" B0 A0 B1 A1 B2 C0 A2 B3 the answer is 42
もう一度サンプルコードに戻ってみると、下のジェネレータをexecutor X
でspawn
した時点では、まだ非同期コードは実行されません。
#[entry] fn main() -> ! { if let Some((x, _a)) = X::get() { x.spawn(move || { hprintln!(" A0").ok(); yield; hprintln!(" A1").ok(); // but of course you can `spawn` a task from a spawned task x.spawn(|| { hprintln!(" C0").ok(); yield; hprintln!(" C1").ok(); }); yield; hprintln!(" A2").ok(); // NOTE return value will be discarded 42 });
続く、block_on
でジェネレータが与えられると、実行を開始します。
B0
出力後、yield
すると、先ほどspawn
したタスクに制御が移り、A0
が出力されます。
let ans = x.block_on(|| { hprintln!("B0").ok(); yield; hprintln!("B1").ok(); yield; hprintln!("B2").ok(); yield; hprintln!("B3").ok(); 42 }); hprintln!("the answer is {}", ans).ok(); }
A1
ではさらにタスクをspawn
していますが、その時点では実行されておらず、B2
出力後のyield
で制御が移ってきます。
C0
後にyield
すると、A2
を出力するコードに制御が移っていますね。
block_on
で実行したジェネレータからは、戻り値 (42
) を受け取っています。
でもC1
が実行されていませんね?
そう、block_on
はspawnしたタスク全てが完了になることを保証しません。
単に、引数で渡されたジェネレータが完了になるまで、実行を進めるだけです。
block_on
をネストするとデッドロックする可能性があるため、TM executorではネストしたblock_on
呼び出しはパニックになるよう、実装されています。
#[r#async]
/ r#await!
block_on
をネストできないとすると、ジェネレータをどうやったら完了状態にできるのでしょうか?
r#await!
マクロを使います。ジェネレータを返す関数を簡単に書くために#[r#async]
アトリビュートもあります。
例として、割り込みハンドラから非同期にデータを受け取りたいとします。#[r#async]
を使って次のように書くことができます。
まずは、ジェネレータを簡単に書くためのアトリビュートです。
use core::ops::Generator; use heapless::{ spsc::Consumer, // consumer endpoint of a single-producer single-consumer queue ArrayLength, }; use gen_async_await::r#async; #[r#async] fn dequeue<T, N>(mut c: Consumer<'static, T, N>) -> (T, Consumer<'static, T, N>) where N: ArrayLength<T>, { loop { if let Some(x) = c.dequeue() { break (x, c); } yield } } // OR you could have written this; both are equivalent fn dequeue2<T, N>( mut c: Consumer<'static, T, N>, ) -> impl Generator<Yield = (), Return = (T, Consumer<'static, T, N>)> where N: ArrayLength<T>, { || loop { if let Some(x) = c.dequeue() { break (x, c); } yield } }
dequeue
はジェネレータを返す関数で、ジェネレータでは割り込みハンドラのProducer
からデータが送られてくるとSome
になるから、そこで値を返して、yield
しています。
んー?ジェネレータが複雑な型になったりすると有り難いのかな…?
アプリケーションは次のように書けます。
#[entry] fn main() -> ! { static mut Q: Queue<i32, consts::U4> = Queue(i::Queue::new()); let (p, mut c) = Q.split(); // send the producer to an interrupt handler send(p); if let Some((x, _a)) = X::get() { // task that asynchronously processes items produced by // the interrupt handler x.spawn(move || loop { let ret = r#await!(dequeue(c)); // <- ★ let item = ret.0; c = ret.1; // do stuff with `item` }); x.block_on(|| { // .. do something else .. }); } debug::exit(debug::EXIT_SUCCESS); loop {} }
うーん、難しい。dequeue(c)
はジェネレータを返しているので、x.spawn
の引数もジェネレータになると。
std_async::sync::Mutex
?
std_async
では、2つのタスク間でメモリを共有したい場合、Mutex
かRwLock
を使います。
task::spawn
APIの引数はSend
トレイトを実装したジェネレータです。
std_async
のジェネレータはマルチスレッドで動作し、並行実行される可能性があるからです。
use async_std::{sync::Mutex, task}; fn main() { let shared: &'static Mutex<u128> = Box::leak(Box::new(Mutex::new(0u128))); task::spawn(async move { let x = shared.lock().await; println!("{}", x); }); task::block_on(async move { *shared.lock().await += 1; }); }
TM executorは必ず同じコンテキストで動作し、タスクは1つずつ順番に実行されます。
そのためspawn
の引数はSend
を実装する必要がありません。
そのため、タスク間でデータを共有する際、Mutex
の代わりに、単にRefCell
かCell
を使うことができます。
あ、そうですね。
use core::cell::RefCell; #[entry] fn main() -> ! { static mut SHARED: RefCell<u64> = RefCell::new(0); if let Some((x, _a)) = X::get() { let shared: &'static _ = SHARED; x.spawn(move || loop { hprintln!("{}", shared.borrow()).ok(); yield; }); x.block_on(move || { *shared.borrow_mut() += 1; yield; *shared.borrow_mut() += 1; yield; }); } debug::exit(debug::EXIT_SUCCESS); loop {} }
うん、これはよくわかる。
実装を覗いてみよう
さて、ここまでがREADME
です (長い…) 。
r#await!
/ #[r#async]
r#await!
マクロは、わりと読めます。
|> gen-async-await/src/lib.rs
/// `e.await` -> `r#await(e)` // expansion is equivalent to the desugaring of `($g).await` -- see // rust-lang/rust/src/librustc/hir/lowering/expr.rs (Rust 1.39) // XXX Does `$g` need to satisfy the `: Unpin` bound? -- I think not because `$g` is drived to // completion so any self-referential borrow will be over by the time this macro returns control // back to the caller. This is unlike `futures::select!` which partially polls its input futures. // Those input futures may be moved around and then passed to a different `select!` call; the move // can invalidate self-referential borrows so the input future must satisfy `Unpin` #[macro_export] macro_rules! r#await { ($g:expr) => { match $g { mut pinned => { use core::ops::Generator; loop { match unsafe { core::pin::Pin::new_unchecked(&mut pinned).resume() } { core::ops::GeneratorState::Yielded(()) => {} core::ops::GeneratorState::Complete(x) => break x, } yield () } } } }; }
ジェネレータのステートマシンに沿った動作をしているようです。
#[r#async]
アトリビュートですが、修行をサボっていたせいで未だに手続きマクロがいまいち読めません!
が、関数の戻り値型をimpl Generator
型にし、関数内の処理をジェネレータにしているっぽいです。
/// `async fn foo() { .. }` -> `#[r#async] fn foo() { .. }` // NOTE the built-in `async fn` desugars to a generator and wraps it in a newtype that makes it // `!Unpin`; this is required because `async fn`s allow self-referential borrows (i.e. `let x = ..; // let y = &x; f().await; use(y)`). AFAICT, self referential borrows are not possible in generators // (as of 1.39) so I think we don't need the newtype #[proc_macro_attribute] pub fn r#async(args: TokenStream, item: TokenStream) -> TokenStream { if !args.is_empty() { return parse::Error::new(Span::call_site(), "`#[async]` attribute takes no arguments") .to_compile_error() .into(); } // snip let block = &item.block; quote!( #(#attrs)* #vis fn #ident #generics ( #inputs ) -> impl core::ops::Generator<Yield = (), Return = #output> #(+ #lts)* #where_clause { move || #block } ) .into()
collections
初期化時に任意のアロケータが指定できるコレクションです。
例えばVec
なら次のような感じです。
|> collections/src/vec.rs
pub struct Vec<T, A> where A: Alloc, { allocator: A, cap: usize, len: usize, ptr: Unique<T>, } impl<A, T> Vec<T, A> where A: Alloc, { // `new`で`Alloc`トレイトを実装するAを引数として渡す pub fn new(allocator: A) -> Self { let cap = if mem::size_of::<T>() == 0 { usize::max_value() } else { 0 }; Self { allocator, cap, len: 0, ptr: Unique::empty(), } } // snip
cortex-m-tm-alloc
Cortex-M
のスレッドモードでのみ使用できる「スレッドモードアロケータ」です。
get()
を呼んだ際、スレッドモードならアロケータインスタンスを、そうでなければNone
が得られるのでした。
Cortex-Mを知っていれば、実装は素直です。SCBのICSRの値を読み込んで、スレッドモードならSome
を、そうでなければNone
を返しています。
|> cortex-m-tm-alloc/src/lib.rs
pub unsafe fn get() -> Option<Self> { if cfg!(not(cortex_m)) { return None; } const SCB_ICSR: *const u32 = 0xE000_ED04 as *const u32; if SCB_ICSR.read_volatile() as u8 == 0 { // Thread mode (i.e. not within an interrupt or exception handler) Some(Private { _not_send_or_sync: PhantomData, }) } else { None } }
allocator
アトリビュートの実装は歯が立たなかったです。出直します。
cortex-m-tm-executor
Executor
の実装を見てみます。
アロケータとタスク配列、実行中かどうかを示すフラグをフィールドに持ちます。
|> cortex-m-tm-executor
pub struct Executor<A> where A: Alloc + Copy, { allocator: A, /// Spawned tasks tasks: UnsafeCell<Vec<Pin<Task<A>>, A>>, running: Cell<bool>, }
まず、spawn
です。
タスクをヒープ領域に作って、タスク配列に追加するだけです。
なので、spawn()
を呼んだだけではタスクが実行されなかったわけですね。
impl<A> Executor<A> where A: Alloc + Copy, { // snip pub fn spawn<T>(&self, g: impl Generator<Yield = (), Return = T> + 'static) { // this alternative to `GenDrop` produces larger heap allocations // let g = || drop(r#await!(g)); let task: Task<A> = Box::new(GenDrop { g }, self.allocator); unsafe { (*self.tasks.get()).push(task.into()); } } }
一方、block_on
では、ジェネレータが完了状態になるまで、タスク配列内のタスクを単純に順番に実行していることがわかります。
pub fn block_on<T>(&self, g: impl Generator<Yield = (), Return = T>) -> T { self.running.set(true); pin_mut!(g); loop { // move forward the main task `g` if let GeneratorState::Complete(x) = g.as_mut().resume() { self.running.set(false); break x; } let n = unsafe { (*self.tasks.get()).len() }; for i in (0..n).rev() { let s = { let task: TaskMut = unsafe { (*self.tasks.get()).get_unchecked_mut(i).as_mut() }; task.resume() }; if let GeneratorState::Complete(()) = s { // task completed -- release memory let task = unsafe { (*self.tasks.get()).swap_remove(i) }; drop(task); } } } }
ジェネレータの実装について少し。
結果を破棄するGenDrop
ジェネレータが実装されています。
resume
するとGeneratorState
が返ります。
GeneratorState::Complete
になると、値をdrop()
していることがわかります。
impl<G> Generator for GenDrop<G> where G: Generator<Yield = ()>, { type Yield = (); type Return = (); fn resume(self: Pin<&mut Self>) -> GeneratorState<(), ()> { match G::resume(self.g()) { GeneratorState::Yielded(()) => GeneratorState::Yielded(()), GeneratorState::Complete(x) => { drop(x); GeneratorState::Complete(()) } } } }
終わりに
完全に理解できなかったァ…。 とは言え、かなり理解は深まりました。
ただ、アプリケーションはともかく、実行エンジン作るのは、かなり大変そうですね…。