Rust の組込み用非同期フレームワーク Embassy (3)
組込み向けの async/await フレームワーク embassy をゆるーく眺めていくシリーズです。 前回は main についているマクロがなにやってるか見てみました。
そして個人的には、 Spawner とか Executor がどうなってるねん!が気になるので、そこ突っ込んでいきましょう。
ということで embassy-executor を見ていきます。
アーキテクチャ固有の実装はかなりコンパクトになっています。 とりあえず Cortex-M の実装 embassy-executor/src/arch/cortex_m.rs でも見てみましょう。
executor-thread
と executor-interrupt
があるのですが、一旦 executor-thread
に絞って読んでいきます。
/// Thread mode executor, using WFE/SEV.
とあるとおり、Cortex-M の WFE 命令と SEV 命令を使って、スリープ / スリープからの復帰をする Executor のようです。
SEV 命令を実行する ThreadPender
構造体が用意されています。
トレイトになっていないのですね?
#[derive(Copy, Clone)] pub(crate) struct ThreadPender; impl ThreadPender { pub(crate) fn pend(self) { unsafe { core::arch::asm!("sev") } } }
上位側で arch 固有の実装がないとダメになっているし、crate 内にアーキテクチャ固有の実装を全部置いているから、トレイトで縛らなくて良い、って感じなのですかね?
pend()
だけあれば良いだけですし、そういう選択肢もありですね。
#[derive(Clone, Copy)] pub(crate) enum PenderInner { #[cfg(feature = "executor-thread")] Thread(crate::arch::ThreadPender), #[cfg(feature = "executor-interrupt")] Interrupt(crate::arch::InterruptPender), #[cfg(feature = "pender-callback")] Callback { func: fn(*mut ()), context: *mut () }, }
Executor
も同様に、トレイト使わずに各実装でインターフェース合わせているっぽいですね ((wasm 向けの実装だけ、run
がなくて start
になっているけど、これで良いのか…?)) 。
これで実装全部です。
/// Thread mode executor, using WFE/SEV. /// /// This is the simplest and most common kind of executor. It runs on /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction /// is executed, to make the `WFE` exit from sleep and poll the task. /// /// This executor allows for ultra low power consumption for chips where `WFE` /// triggers low-power sleep without extra steps. If your chip requires extra steps, /// you may use [`raw::Executor`] directly to program custom behavior. pub struct Executor { inner: raw::Executor, not_send: PhantomData<*mut ()>, } impl Executor { /// Create a new Executor. pub fn new() -> Self { Self { inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), not_send: PhantomData, } } /// Run the executor. /// /// The `init` closure is called with a [`Spawner`] that spawns tasks on /// this executor. Use it to spawn the initial task(s). After `init` returns, /// the executor starts running the tasks. /// /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), /// for example by passing it as an argument to the initial tasks. /// /// This function requires `&'static mut self`. This means you have to store the /// Executor instance in a place where it'll live forever and grants you mutable /// access. There's a few ways to do this: /// /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) /// - a `static mut` (unsafe) /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) /// /// This function never returns. pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { init(self.inner.spawner()); loop { unsafe { self.inner.poll(); asm!("wfe"); }; } } }
run
の内容も、 Spawner
初期化して無限ループで、 poll()
し終わったら、 WFE 命令で寝る!以上!です。
割り込み受けたり、SVE 命令受けるともう1回 poll()
して〜、となるわけですね。
ちなみに poll()
の中では RunQueue
という実行可能なタスクが積まれている Queue があって、これが空になるまで Task を実行するようになっています。
つまり、poll()
から抜けてくると、Executor としては、実行しないといけない async タスクが存在しない状態になっているわけですね。
シンプル!
poll()
の実体は、raw::SyncExecutor が持っています。
ちょっと Timer の実装が邪魔なのですけど、それを取り除いちゃうとこれだけです。
RunQueue
からタスクを1個ずつ取り出して、実行しているだけですね。
/// # Safety /// /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { #[allow(clippy::never_loop)] loop { self.run_queue.dequeue_all(|p| { let task = p.header(); let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); if state & STATE_SPAWNED == 0 { // If task is not running, ignore it. This can happen in the following scenario: // - Task gets dequeued, poll starts // - While task is being polled, it gets woken. It gets placed in the queue. // - Task poll finishes, returning done=true // - RUNNING bit is cleared, but the task is already in the queue. return; } // Run the task task.poll_fn.get().unwrap_unchecked()(p); }); } }
ここまで来れば、あとは
- Spawner が Executor にタスクを追加する流れ
- Waker から pend 呼び出すまでの流れ
あたりがわかれば大まかな仕組みが理解できそうです。
Waker は Rust の core (std) 経由するのは知っているので、 Spawner の方が楽に読めそうでしょうか? 次回は Spawner 読んでみましょう。