Rust の組込み用非同期フレームワーク Embassy (3)

組込み向けの async/await フレームワーク embassy をゆるーく眺めていくシリーズです。 前回は main についているマクロがなにやってるか見てみました。

github.com

そして個人的には、 Spawner とか Executor がどうなってるねん!が気になるので、そこ突っ込んでいきましょう。

ということで embassy-executor を見ていきます。

アーキテクチャ固有の実装はかなりコンパクトになっています。 とりあえず Cortex-M の実装 embassy-executor/src/arch/cortex_m.rs でも見てみましょう。

executor-threadexecutor-interrupt があるのですが、一旦 executor-thread に絞って読んでいきます。 /// Thread mode executor, using WFE/SEV. とあるとおり、Cortex-M の WFE 命令と SEV 命令を使って、スリープ / スリープからの復帰をする Executor のようです。

SEV 命令を実行する ThreadPender 構造体が用意されています。 トレイトになっていないのですね?

    #[derive(Copy, Clone)]
    pub(crate) struct ThreadPender;

    impl ThreadPender {
        pub(crate) fn pend(self) {
            unsafe { core::arch::asm!("sev") }
        }
    }

上位側で arch 固有の実装がないとダメになっているし、crate 内にアーキテクチャ固有の実装を全部置いているから、トレイトで縛らなくて良い、って感じなのですかね? pend() だけあれば良いだけですし、そういう選択肢もありですね。

#[derive(Clone, Copy)]
pub(crate) enum PenderInner {
    #[cfg(feature = "executor-thread")]
    Thread(crate::arch::ThreadPender),
    #[cfg(feature = "executor-interrupt")]
    Interrupt(crate::arch::InterruptPender),
    #[cfg(feature = "pender-callback")]
    Callback { func: fn(*mut ()), context: *mut () },
}

Executor も同様に、トレイト使わずに各実装でインターフェース合わせているっぽいですね ((wasm 向けの実装だけ、run がなくて start になっているけど、これで良いのか…?)) 。

これで実装全部です。

    /// Thread mode executor, using WFE/SEV.
    ///
    /// This is the simplest and most common kind of executor. It runs on
    /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction
    /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction
    /// is executed, to make the `WFE` exit from sleep and poll the task.
    ///
    /// This executor allows for ultra low power consumption for chips where `WFE`
    /// triggers low-power sleep without extra steps. If your chip requires extra steps,
    /// you may use [`raw::Executor`] directly to program custom behavior.
    pub struct Executor {
        inner: raw::Executor,
        not_send: PhantomData<*mut ()>,
    }

    impl Executor {
        /// Create a new Executor.
        pub fn new() -> Self {
            Self {
                inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))),
                not_send: PhantomData,
            }
        }

        /// Run the executor.
        ///
        /// The `init` closure is called with a [`Spawner`] that spawns tasks on
        /// this executor. Use it to spawn the initial task(s). After `init` returns,
        /// the executor starts running the tasks.
        ///
        /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
        /// for example by passing it as an argument to the initial tasks.
        ///
        /// This function requires `&'static mut self`. This means you have to store the
        /// Executor instance in a place where it'll live forever and grants you mutable
        /// access. There's a few ways to do this:
        ///
        /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
        /// - a `static mut` (unsafe)
        /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
        ///
        /// This function never returns.
        pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
            init(self.inner.spawner());

            loop {
                unsafe {
                    self.inner.poll();
                    asm!("wfe");
                };
            }
        }
    }

run の内容も、 Spawner 初期化して無限ループで、 poll() し終わったら、 WFE 命令で寝る!以上!です。 割り込み受けたり、SVE 命令受けるともう1回 poll() して〜、となるわけですね。

ちなみに poll() の中では RunQueue という実行可能なタスクが積まれている Queue があって、これが空になるまで Task を実行するようになっています。 つまり、poll() から抜けてくると、Executor としては、実行しないといけない async タスクが存在しない状態になっているわけですね。 シンプル!

poll() の実体は、raw::SyncExecutor が持っています。

ちょっと Timer の実装が邪魔なのですけど、それを取り除いちゃうとこれだけです。 RunQueue からタスクを1個ずつ取り出して、実行しているだけですね。

    /// # Safety
    ///
    /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
    pub(crate) unsafe fn poll(&'static self) {
        #[allow(clippy::never_loop)]
        loop {
            self.run_queue.dequeue_all(|p| {
                let task = p.header();

                let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
                if state & STATE_SPAWNED == 0 {
                    // If task is not running, ignore it. This can happen in the following scenario:
                    //   - Task gets dequeued, poll starts
                    //   - While task is being polled, it gets woken. It gets placed in the queue.
                    //   - Task poll finishes, returning done=true
                    //   - RUNNING bit is cleared, but the task is already in the queue.
                    return;
                }

                // Run the task
                task.poll_fn.get().unwrap_unchecked()(p);
            });
        }
    }

ここまで来れば、あとは

  • Spawner が Executor にタスクを追加する流れ
  • Waker から pend 呼び出すまでの流れ

あたりがわかれば大まかな仕組みが理解できそうです。

Waker は Rust の core (std) 経由するのは知っているので、 Spawner の方が楽に読めそうでしょうか? 次回は Spawner 読んでみましょう。