Rust の組込み用非同期フレームワーク Embassy (7)

自分のためだけに Embassy コードリーディングシリーズです。 nRF52840 向けの Waker を wake している実装を見ていきます。

GPIO 割り込みから wake() するのが、おそらく最も単純な実装でしょう、ということで GPIO の example を探します。 examples/nrf52840/src/bin/gpiote_port.rs

中身は下のような感じです。 Task Pool が 4 になっていて、同じタスクを複数 spawn できるようになっているようですが、そこは一旦無視しましょう。 pin.wait_for_low() を見るのが良さそうです。

#[embassy_executor::task(pool_size = 4)]
async fn button_task(n: usize, mut pin: Input<'static, AnyPin>) {
    loop {
        pin.wait_for_low().await;
        info!("Button {:?} pressed!", n);
        pin.wait_for_high().await;
        info!("Button {:?} released!", n);
    }
}

#[embassy_executor::main]
async fn main(spawner: Spawner) {
    let p = embassy_nrf::init(Default::default());
    info!("Starting!");

    let btn1 = Input::new(p.P0_11.degrade(), Pull::Up);
    let btn2 = Input::new(p.P0_12.degrade(), Pull::Up);
    let btn3 = Input::new(p.P0_24.degrade(), Pull::Up);
    let btn4 = Input::new(p.P0_25.degrade(), Pull::Up);

    unwrap!(spawner.spawn(button_task(1, btn1)));
    unwrap!(spawner.spawn(button_task(2, btn2)));
    unwrap!(spawner.spawn(button_task(3, btn3)));
    unwrap!(spawner.spawn(button_task(4, btn4)));
}

InputFlex のラッパーになっています。 wait_for_low() を呼び出すと、Flexwait_for_low() を呼びます。

/// GPIO input driver.
pub struct Input<'d, T: Pin> {
    pub(crate) pin: Flex<'d, T>,
}

impl<'d, T: GpioPin> Input<'d, T> {
    /// Wait until the pin is low. If it is already low, return immediately.
    pub async fn wait_for_low(&mut self) {
        self.pin.wait_for_low().await
    }

ちなみに Flex は Flexible pin のことで、組込み Rust では GPIO ピンの状態を型として表現することが多いのですが、どのような状態も取れるピンとして定義されています。

Flex::wait_for_low() では、GPIO 入力が low になったら割り込みが入るように設定して、PortInputFutureインスタンスを作って await しています。

impl<'d, T: GpioPin> Flex<'d, T> {
    /// Wait until the pin is low. If it is already low, return immediately.
    pub async fn wait_for_low(&mut self) {
        self.pin.conf().modify(|_, w| w.sense().low());
        PortInputFuture::new(&mut self.pin).await
    }

重要なのは Future トレイトを実装している PortInputFuture ですね。 poll の実装は次のようになっています。 PORT ごとに Waker を持っていて、register() で登録するようです。 最後は、GPIO 割り込みが検出されて SENSE が無効になっていたら、Poll::Ready(()) を返しています。

impl<'a> Future for PortInputFuture<'a> {
    type Output = ();

    fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        PORT_WAKERS[self.pin.pin_port() as usize].register(cx.waker());

        if self.pin.conf().read().sense().is_disabled() {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

register() の部分ですが、PORT_WAKERSAtomicWaker の static な配列になっており、AtomicWaker がこうなので、割り込み待ちしている GPIO ピンの Waker を wake したら、このときに登録したコンテキストが run queue に入る、ということになります。

/// Utility struct to register and wake a waker.
pub struct AtomicWaker {
    waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>,
}

impl AtomicWaker {
    /// Register a waker. Overwrites the previous waker, if any.
    pub fn register(&self, w: &Waker) {
        critical_section::with(|cs| {
            let cell = self.waker.borrow(cs);
            cell.set(match cell.replace(None) {
                Some(w2) if (w2.will_wake(w)) => Some(w2),
                _ => Some(w.clone()),
            })
        })
    }

    /// Wake the registered waker, if any.
    pub fn wake(&self) {
        critical_section::with(|cs| {
            let cell = self.waker.borrow(cs);
            if let Some(w) = cell.replace(None) {
                w.wake_by_ref();
                cell.set(Some(w));
            }
        })
    }
}

では続いて、GPIO の割り込みハンドラ embassy-nrf/src/gpiote.rs です。 重要なところはコード中にコメントで補足した、wake() しているところです。 この GPIO 割り込みの中で、wait_for_low() している実行コンテキストを run queue に入れて、WFE から復帰すると、無事 wait_for_low() の続きからプログラムが実行されることになります。

// 抜粋
unsafe fn handle_gpiote_interrupt() {
    let g: &pac::gpiote::RegisterBlock = regs();

    if g.events_port.read().bits() != 0 {
        g.events_port.write(|w| w);
        let ports = &[&*pac::P0::ptr(), &*pac::P1::ptr()];

        for (port, &p) in ports.iter().enumerate() {
            let bits = p.latch.read().bits();
            for pin in BitIter(bits) {
                // ここで Waker を wake() している
                p.pin_cnf[pin as usize].modify(|_, w| w.sense().disabled());
                PORT_WAKERS[port * 32 + pin as usize].wake();
            }
            p.latch.write(|w| w.bits(bits));
        }
    }
}

ということで、 embassy 自体には他にも機能がありそうですが、最もベースとなる部分の仕組みは大体わかった気になれましたね。

Rust の組込み用非同期フレームワーク Embassy (6)

誰得 Embassy コードリーディングシリーズです。 Spawner 周りを見終わったので、次は Waker いってみましょう。

Waker はおそらく wake 呼び出す側から見た方がわかりやすいでしょう。 std で動く example の中だと serial.rs でしょうか。

use async_io::Async;
use embassy_executor::Executor;
use embedded_io::asynch::Read;
use log::*;
use nix::sys::termios;
use static_cell::StaticCell;

use self::serial_port::SerialPort;

#[embassy_executor::task]
async fn run() {
    // Open the serial port.
    let baudrate = termios::BaudRate::B115200;
    let port = SerialPort::new("/dev/ttyACM0", baudrate).unwrap();
    //let port = Spy::new(port);

    // Use async_io's reactor for async IO.
    // This demonstrates how embassy's executor can drive futures from another IO library.
    // Essentially, async_io::Async converts from AsRawFd+Read+Write to futures's AsyncRead+AsyncWrite
    let port = Async::new(port).unwrap();

    // We can then use FromStdIo to convert from futures's AsyncRead+AsyncWrite
    // to embedded_io's async Read+Write.
    //
    // This is not really needed, you could write the code below using futures::io directly.
    // It's useful if you want to have portable code across embedded and std.
    let mut port = embedded_io::adapters::FromFutures::new(port);

    info!("Serial opened!");

    loop {
        let mut buf = [0u8; 256];
        let n = port.read(&mut buf).await.unwrap();
        info!("read {:?}", &buf[..n]);
    }
}

と思ったら embassy 用のアダプター層があるせいでちょっと大変ですね。 SerialPortAsync<T> でラップされていて (この時点では futures-io に対応)、さらに embedded-io のアダプターが適用されています、と。

embedded-io のアダプターは FromFutures<T> で、read() を呼ぶと、poll_fn() の中で、アダプターの中身の poll_read() を呼びます、と。

/// Adapter from `futures::io` traits.
pub struct FromFutures<T: ?Sized> {
    inner: T,
}

impl<T: futures::io::AsyncRead + Unpin + ?Sized> crate::asynch::Read for FromFutures<T> {
    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
        poll_fn(|cx| Pin::new(&mut self.inner).poll_read(cx, buf)).await
    }
}

poll_read() はどうなっているかと言うと、さらに中身から read() して、io::ErrorKind::WouldBlock かそれ以外かで戻り値が違ってきます。 データが読み込めたら Poll::Ready でそのデータが返るのと、WouldBlock 以外のエラーでも Poll::Ready でエラーが返りそうですね。 ready! マクロが何かというと?

impl<T: Read> AsyncRead for Async<T> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        loop {
            match (*self).get_mut().read(buf) {
                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
                res => return Poll::Ready(res),
            }
            ready!(self.poll_readable(cx))?;
        }
    }

poll_readable() が返す Poll を unwrap するか Pending を返すようになっています。 1つ上の match が Poll::Ready か io エラーしか返さないので、ここで Pending を返しているだけ、という感じですかね。

/// Unwraps `Poll<T>` or returns [`Pending`][`core::task::Poll::Pending`].
#[macro_export]
macro_rules! ready {
    ($e:expr $(,)?) => {
        match $e {
            core::task::Poll::Ready(t) => t,
            core::task::Poll::Pending => return core::task::Poll::Pending,
        }
    };
}

Async<T>::poll_readable() 見ると io エラー以外は意味のある値が返らなさそうなので、ready! マクロの core::task::Poll::Ready(t) => t は io エラー、それ以外だと Poll::Pending が返りそうです。

impl<T> Async<T> {
// ...
    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        self.source.poll_readable(cx)
    }

poll_readable() をもう少し下っていくと async_io の Source::poll_readable() -> poll_ready() になります、と。 で、ここで Waker を登録するようです。

impl Source {
    /// Polls the I/O source for readability.
    pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        self.poll_ready(READ, cx)
    }
// ...
    /// Registers a waker from `poll_readable()` or `poll_writable()`.
    ///
    /// If a different waker is already registered, it gets replaced and woken.
    fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        let mut state = self.state.lock().unwrap();

        // Check if the reactor has delivered an event.
        if let Some((a, b)) = state[dir].ticks {
            // If `state[dir].tick` has changed to a value other than the old reactor tick,
            // that means a newer reactor tick has delivered an event.
            if state[dir].tick != a && state[dir].tick != b {
                state[dir].ticks = None;
                return Poll::Ready(Ok(()));
            }
        }

        let was_empty = state[dir].is_empty();

        // Register the current task's waker.
        if let Some(w) = state[dir].waker.take() {
            if w.will_wake(cx.waker()) {
                state[dir].waker = Some(w);
                return Poll::Pending;
            }
            // Wake the previous waker because it's going to get replaced.
            panic::catch_unwind(|| w.wake()).ok();
        }
        state[dir].waker = Some(cx.waker().clone());
        state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));

        // Update interest in this I/O handle.
        if was_empty {
            Reactor::get().poller.modify(
                self.raw,
                Event {
                    key: self.key,
                    readable: !state[READ].is_empty(),
                    writable: !state[WRITE].is_empty(),
                },
            )?;
        }

        Poll::Pending
    }

Waker を clone するか (state[dir].waker = Some(cx.waker().clone());) すでに Waker が登録されていたら即 Pending を返しているみたいです。 あとは、state[dir].wakerwake() を呼び出すところを探せば良さそうです。

async-io の ReactorLock::react の中でやっていそうですね。

impl ReactorLock<'_> {
    /// Processes new events, blocking until the first event or the timeout.
    pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
        let mut wakers = Vec::new();
// ...
        // Block on I/O events.
        let res = match self.reactor.poller.wait(&mut self.events, timeout) {
            // No I/O events occurred.
            Ok(0) => {
                if timeout != Some(Duration::from_secs(0)) {
                    // The non-zero timeout was hit so fire ready timers.
                    self.reactor.process_timers(&mut wakers);
                }
                Ok(())
            }

            // At least one I/O event occurred.
            Ok(_) => {
                // Iterate over sources in the event list.
                let sources = self.reactor.sources.lock().unwrap();

                for ev in self.events.iter() {
                    // Check if there is a source in the table with this key.
                    if let Some(source) = sources.get(ev.key) {
                        let mut state = source.state.lock().unwrap();

                        // Collect wakers if a writability event was emitted.
                        for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
                            if emitted {
                                state[dir].tick = tick;
                                state[dir].drain_into(&mut wakers);
                            }
                        }
// ...
                    }
                }

                Ok(())
            }
// ...
        };

        // Wake up ready tasks.
        log::trace!("react: {} ready wakers", wakers.len());
        for waker in wakers {
            // Don't let a panicking waker blow everything up.
            panic::catch_unwind(|| waker.wake()).ok();
        }

        res
    }
}

self.reactor.poller.wait(&mut self.events, timeout) の中を見ていくと epoll_wait 呼んでいたので、大体予想通りですね。

これが、マイコンターゲットだと割り込み待ちで、Waker::wake() を呼ぶのでしょう。 nRF あたりの割り込み待ち実装を次回眺めてから、Waker::wake() の中身を追いかければ、大体コアの部分は理解できそうですね。

Rust の組込み用非同期フレームワーク Embassy (5)

ゆるふわ Embassy コードリーディングシリーズです。 前回 spawn 周りはあとタスクの構造だけ、というところまで来たのでタスクの構造を見てみましょう。

タスク関連の構造体としては

  • TaskHeader
  • TaskStorage
  • TaskRef

があります。このへん で定義されています。

TaskHeader は次のように定義されています。 日本語コメントは私の注釈です。

/// Raw task header for use in task pointers.
pub(crate) struct TaskHeader {
    // spawn されたかどうかの状態
    pub(crate) state: AtomicU32,
    // RunQueue に入っている次の TaskHeader への参照
    pub(crate) run_queue_item: RunQueueItem,
    // タスクを実行する Executor への参照
    pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
    // poll 時に呼び出す関数ポインタ。実体は `TaskStorage::poll`
    poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
}

TaskStrorage は TaskHeader + Future です。 TaskStorage::poll()Future::poll() を呼び出しています。

#[repr(C)]
pub struct TaskStorage<F: Future + 'static> {
    raw: TaskHeader,
    future: UninitCell<F>, // Valid if STATE_SPAWNED
}

impl<F: Future + 'static> TaskStorage<F> {
    const NEW: Self = Self::new();
// ...
    unsafe fn poll(p: TaskRef) {
        let this = &*(p.as_ptr() as *const TaskStorage<F>);

        let future = Pin::new_unchecked(this.future.as_mut());
        let waker = waker::from_task(p);
        let mut cx = Context::from_waker(&waker);
        match future.poll(&mut cx) {
            Poll::Ready(_) => {
                this.future.drop_in_place();
                this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);

                #[cfg(feature = "integrated-timers")]
                this.raw.expires_at.set(Instant::MAX);
            }
            Poll::Pending => {}
        }

        // the compiler is emitting a virtual call for waker drop, but we know
        // it's a noop for our waker.
        mem::forget(waker);
    }

poll() のパスを辿ると、こんな感じになりそうです。

  • Executor(SyncExecutor)::poll()
    • TaskHeader::poll_fn() (実体は TaskStorage::poll()
      • Future::poll()

TaskRefTaskHeader の参照を持っていますが、コメントにある通り、実質は TaskStorage への参照です (ただし Future は型は消えています) 。

/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
#[derive(Clone, Copy)]
pub struct TaskRef {
    ptr: NonNull<TaskHeader>,
}

impl TaskRef {
    fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
        Self {
            ptr: NonNull::from(task).cast(),
        }
    }

    /// Safety: The pointer must have been obtained with `Task::as_ptr`
    pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
        Self {
            ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
        }
    }

    pub(crate) fn header(self) -> &'static TaskHeader {
        unsafe { self.ptr.as_ref() }
    }

    /// The returned pointer is valid for the entire TaskStorage.
    pub(crate) fn as_ptr(self) -> *const TaskHeader {
        self.ptr.as_ptr()
    }
}

一時、TaskStorage::poll()Poll::Ready のときの値捨てちゃってるけど、async 関数の戻り値ある場合はどうするのでしょう? と思ったりしましたが、これはあくまでも、embassy_executor の Task を実行しているだけで、embassy_executor の Task は戻り値を持たないのでしょう、という理解に到達しました。

examples/std/src/bin/serial.rs では Task の中で、async 関数を呼んで、シリアルから読み取ったデータを戻り値として受け取っています。

#[embassy_executor::task]
async fn run() {
    // Open the serial port.
    let baudrate = termios::BaudRate::B115200;
    let port = SerialPort::new("/dev/ttyACM0", baudrate).unwrap();
    //let port = Spy::new(port);

    // Use async_io's reactor for async IO.
    // This demonstrates how embassy's executor can drive futures from another IO library.
    // Essentially, async_io::Async converts from AsRawFd+Read+Write to futures's AsyncRead+AsyncWrite
    let port = Async::new(port).unwrap();

    // We can then use FromStdIo to convert from futures's AsyncRead+AsyncWrite
    // to embedded_io's async Read+Write.
    //
    // This is not really needed, you could write the code below using futures::io directly.
    // It's useful if you want to have portable code across embedded and std.
    let mut port = embedded_io::adapters::FromFutures::new(port);

    info!("Serial opened!");

    loop {
        let mut buf = [0u8; 256];
        let n = port.read(&mut buf).await.unwrap();
        info!("read {:?}", &buf[..n]);
    }
}

Future についてはこちらのブログがとてもよくまとまっていて、助かりました。

blog.tiqwab.com

次は Waker 見ていきましょうね。

Rust の組込み用非同期フレームワーク Embassy (4)

ゆるふわ Embassy コードリーディングシリーズです。 前回 Executor がわかってきた気持ちになったので、もう少し潜ってみましょう。

今回は Spawner いってみます。

コメントや構造体定義を見るに、Spawner は特定の Executor に紐付いていることがわかります。 他スレッドからタスクを作りたい場合は SendSpawner を使え、とあります (こちらは一旦放置) 。

/// Handle to spawn tasks into an executor.
///
/// This Spawner can spawn any task (Send and non-Send ones), but it can
/// only be used in the executor thread (it is not Send itself).
///
/// If you want to spawn tasks from another thread, use [SendSpawner].
#[derive(Copy, Clone)]
pub struct Spawner {
    executor: &'static raw::Executor,
    not_send: PhantomData<*mut ()>,
}

今の実行コンテキストから Spawner を取得するメソッドが定義されています。 こういうのはおもしろいですね。

poll_fn は標準ライブラリの関数で、タスクコンテキストを引数にとるクロージャ (関数) を実行できるみたいですね。 コンテキストからタスクを取り出して、タスクから Executor を取り出して…と順番に辿って、 Spawner を作っています。

Poll::Ready で戻り値返してあげれば、poll_fn() の結果として中身が得られる、と。

    /// Get a Spawner for the current executor.
    ///
    /// This function is `async` just to get access to the current async
    /// context. It returns instantly, it does not block/yield.
    ///
    /// # Panics
    ///
    /// Panics if the current executor is not an Embassy executor.
    pub async fn for_current_executor() -> Self {
        poll_fn(|cx| {
            let task = raw::task_from_waker(cx.waker());
            let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
            let executor = unsafe { raw::Executor::wrap(executor) };
            Poll::Ready(Self::new(executor))
        })
        .await
    }

下の1行だけよくわかってないので、深堀りしてみましょう。

let task = raw::task_from_waker(cx.waker());

うーん!けっこう大変な感じなやつが出てきてしまいましたね。 標準ライブラリの RawWaker の構造を見てみます。

/// Get a task pointer from a waker.
///
/// This can be used as an optimization in wait queues to store task pointers
/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps
/// avoid dynamic dispatch.
///
/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task).
///
/// # Panics
///
/// Panics if the waker is not created by the Embassy executor.
pub fn task_from_waker(waker: &Waker) -> TaskRef {
    // safety: OK because WakerHack has the same layout as Waker.
    // This is not really guaranteed because the structs are `repr(Rust)`, it is
    // indeed the case in the current implementation.
    // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992
    let hack: &WakerHack = unsafe { mem::transmute(waker) };
    if hack.vtable != &VTABLE {
        panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.")
    }

    // safety: our wakers are always created with `TaskRef::as_ptr`
    unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) }
}

struct WakerHack {
    data: *const (),
    vtable: &'static RawWakerVTable,
}

コメントにある通り、WakerHack と同じ構造ですね。 この Executor が使える data 部分に Embassy では TaskRef (TaskHeader) を入れていて、vtable には waker.rs で作っている VTABLE の参照を格納している、と。

/// A `RawWaker` allows the implementor of a task executor to create a [`Waker`]
/// which provides customized wakeup behavior.
///
/// [vtable]: https://en.wikipedia.org/wiki/Virtual_method_table
///
/// It consists of a data pointer and a [virtual function pointer table (vtable)][vtable]
/// that customizes the behavior of the `RawWaker`.
#[derive(PartialEq, Debug)]
#[stable(feature = "futures_api", since = "1.36.0")]
pub struct RawWaker {
    /// A data pointer, which can be used to store arbitrary data as required
    /// by the executor. This could be e.g. a type-erased pointer to an `Arc`
    /// that is associated with the task.
    /// The value of this field gets passed to all functions that are part of
    /// the vtable as the first parameter.
    data: *const (),
    /// Virtual function pointer table that customizes the behavior of this waker.
    vtable: &'static RawWakerVTable,
}

Spawner のおもしろメソッド見ていたらだいぶ横道にそれてしまいました。

肝心の spawn() メソッドは何をやっているのかと言うと…。 SpawnToken を引数で受け取って、Executor::spanw() を呼び出しています。

    /// Spawn a task into an executor.
    ///
    /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`).
    pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
        let task = token.raw_task;
        mem::forget(token);

        match task {
            Some(task) => {
                unsafe { self.executor.spawn(task) };
                Ok(())
            }
            None => Err(SpawnError::Busy),
        }
    }

SpawnToken が何かと言うと、基本 TaskRef を持っているだけっぽいです。 #[embassy_executor::task] を付けた関数を呼び出すと、その戻り値は SpawnTaken になっていて、それを Executor で実行する想定のようです。

pub struct SpawnToken<S> {
    raw_task: Option<raw::TaskRef>,
    phantom: PhantomData<*mut S>,
}

そう言えば、#[embassy_executor::task] をマクロ展開すると、こんな感じでした。

fn run() -> ::embassy_executor::SpawnToken<impl Sized> {
    type Fut = impl ::core::future::Future + 'static;
    const POOL_SIZE: usize = 1;
    static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> = ::embassy_executor::raw::TaskPool::new();
    unsafe { POOL._spawn_async_fn(move || __run_task()) }
}

このあたりが、 Future から TaskRef を作っているのですけど、ちょっと長くなりそうなので、次回にしましょうかね…。

ちょっと戻って、Executor::spawn()SyncExecutor::spanw() を呼び出すようになっていて、タスクの Executor に自分をセットして、 RunQeueue にタスクを入れておしまい!と。

    pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
        task.header().executor.set(Some(self));

        #[cfg(feature = "rtos-trace")]
        trace::task_new(task.as_ptr() as u32);

        self.enqueue(task);
    }

ここまでわかれば、spawn 周りは、タスクの構造がわかれば、おおよそ理解したと言って良い気がします。 ので、次回は、タスクの構造見ていくのをやりましょう。

Rust の組込み用非同期フレームワーク Embassy (3)

組込み向けの async/await フレームワーク embassy をゆるーく眺めていくシリーズです。 前回は main についているマクロがなにやってるか見てみました。

github.com

そして個人的には、 Spawner とか Executor がどうなってるねん!が気になるので、そこ突っ込んでいきましょう。

ということで embassy-executor を見ていきます。

アーキテクチャ固有の実装はかなりコンパクトになっています。 とりあえず Cortex-M の実装 embassy-executor/src/arch/cortex_m.rs でも見てみましょう。

executor-threadexecutor-interrupt があるのですが、一旦 executor-thread に絞って読んでいきます。 /// Thread mode executor, using WFE/SEV. とあるとおり、Cortex-M の WFE 命令と SEV 命令を使って、スリープ / スリープからの復帰をする Executor のようです。

SEV 命令を実行する ThreadPender 構造体が用意されています。 トレイトになっていないのですね?

    #[derive(Copy, Clone)]
    pub(crate) struct ThreadPender;

    impl ThreadPender {
        pub(crate) fn pend(self) {
            unsafe { core::arch::asm!("sev") }
        }
    }

上位側で arch 固有の実装がないとダメになっているし、crate 内にアーキテクチャ固有の実装を全部置いているから、トレイトで縛らなくて良い、って感じなのですかね? pend() だけあれば良いだけですし、そういう選択肢もありですね。

#[derive(Clone, Copy)]
pub(crate) enum PenderInner {
    #[cfg(feature = "executor-thread")]
    Thread(crate::arch::ThreadPender),
    #[cfg(feature = "executor-interrupt")]
    Interrupt(crate::arch::InterruptPender),
    #[cfg(feature = "pender-callback")]
    Callback { func: fn(*mut ()), context: *mut () },
}

Executor も同様に、トレイト使わずに各実装でインターフェース合わせているっぽいですね ((wasm 向けの実装だけ、run がなくて start になっているけど、これで良いのか…?)) 。

これで実装全部です。

    /// Thread mode executor, using WFE/SEV.
    ///
    /// This is the simplest and most common kind of executor. It runs on
    /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction
    /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction
    /// is executed, to make the `WFE` exit from sleep and poll the task.
    ///
    /// This executor allows for ultra low power consumption for chips where `WFE`
    /// triggers low-power sleep without extra steps. If your chip requires extra steps,
    /// you may use [`raw::Executor`] directly to program custom behavior.
    pub struct Executor {
        inner: raw::Executor,
        not_send: PhantomData<*mut ()>,
    }

    impl Executor {
        /// Create a new Executor.
        pub fn new() -> Self {
            Self {
                inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))),
                not_send: PhantomData,
            }
        }

        /// Run the executor.
        ///
        /// The `init` closure is called with a [`Spawner`] that spawns tasks on
        /// this executor. Use it to spawn the initial task(s). After `init` returns,
        /// the executor starts running the tasks.
        ///
        /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
        /// for example by passing it as an argument to the initial tasks.
        ///
        /// This function requires `&'static mut self`. This means you have to store the
        /// Executor instance in a place where it'll live forever and grants you mutable
        /// access. There's a few ways to do this:
        ///
        /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
        /// - a `static mut` (unsafe)
        /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
        ///
        /// This function never returns.
        pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
            init(self.inner.spawner());

            loop {
                unsafe {
                    self.inner.poll();
                    asm!("wfe");
                };
            }
        }
    }

run の内容も、 Spawner 初期化して無限ループで、 poll() し終わったら、 WFE 命令で寝る!以上!です。 割り込み受けたり、SVE 命令受けるともう1回 poll() して〜、となるわけですね。

ちなみに poll() の中では RunQueue という実行可能なタスクが積まれている Queue があって、これが空になるまで Task を実行するようになっています。 つまり、poll() から抜けてくると、Executor としては、実行しないといけない async タスクが存在しない状態になっているわけですね。 シンプル!

poll() の実体は、raw::SyncExecutor が持っています。

ちょっと Timer の実装が邪魔なのですけど、それを取り除いちゃうとこれだけです。 RunQueue からタスクを1個ずつ取り出して、実行しているだけですね。

    /// # Safety
    ///
    /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
    pub(crate) unsafe fn poll(&'static self) {
        #[allow(clippy::never_loop)]
        loop {
            self.run_queue.dequeue_all(|p| {
                let task = p.header();

                let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
                if state & STATE_SPAWNED == 0 {
                    // If task is not running, ignore it. This can happen in the following scenario:
                    //   - Task gets dequeued, poll starts
                    //   - While task is being polled, it gets woken. It gets placed in the queue.
                    //   - Task poll finishes, returning done=true
                    //   - RUNNING bit is cleared, but the task is already in the queue.
                    return;
                }

                // Run the task
                task.poll_fn.get().unwrap_unchecked()(p);
            });
        }
    }

ここまで来れば、あとは

  • Spawner が Executor にタスクを追加する流れ
  • Waker から pend 呼び出すまでの流れ

あたりがわかれば大まかな仕組みが理解できそうです。

Waker は Rust の core (std) 経由するのは知っているので、 Spawner の方が楽に読めそうでしょうか? 次回は Spawner 読んでみましょう。

Rust の組込み用非同期フレームワーク Embassy (2)

Embassy をゆるーく眺めるシリーズです。 前回 README を見てみて Timer どうなっているのか気になりました。 なので実装をおいかけてみましょう。

アプリコードは大体こんな感じでした。 さてどこから行きましょうかね?

use embassy_executor::Spawner;
use embassy_time::{Duration, Timer};

#[embassy_executor::task]
async fn blink(pin: AnyPin) {
    let mut led = Output::new(pin, Level::Low, OutputDrive::Standard);

    loop {
        // Timekeeping is globally available, no need to mess with hardware timers.
        led.set_high();
        Timer::after(Duration::from_millis(150)).await;
        led.set_low();
        Timer::after(Duration::from_millis(150)).await;
    }
}

#[embassy_executor::main]
async fn main(spawner: Spawner) {
    let p = embassy_nrf::init(Default::default());

    // Spawned tasks run in the background, concurrently.
    spawner.spawn(blink(p.P0_13.degrade())).unwrap();
}

気になるところ片っ端からいってみましょう。 まずは main についてるマクロ!

#[embassy_executor::main]

embassy-macros が中身っぽいですね。

embassy-macros

README を見ます。

The task and main macros require the type alias impl trait (TAIT) nightly feature in order to compile.

なるほど? type alias impl trait ? これか。

rust-lang.github.io

type alias つくるときに impl Trait が指定できるように、ということのようですね。 確かにこれは便利そうです。

type Foo = impl Bar;

何がブロッカーなのかと言うと…?けっこう色々 TODO 残ってますね。 もうちょっと時間かかりそうです。残念。

github.com

マクロの展開結果

とりあえずマクロの実装いきなり見る前に軽く展開結果を見ておきましょう。cargo-expand があれば…!

cargo install cargo-expand

あとは、 examples/std の下のホストでビルドできるバイナリを指定してみます。 examles/std/bin/tick.rs 小さくてちょうど良いのがありました。

#![feature(type_alias_impl_trait)]

use embassy_executor::Spawner;
use embassy_time::{Duration, Timer};
use log::*;

#[embassy_executor::task]
async fn run() {
    loop {
        info!("tick");
        Timer::after(Duration::from_secs(1)).await;
    }
}

#[embassy_executor::main]
async fn main(spawner: Spawner) {
    env_logger::builder()
        .filter_level(log::LevelFilter::Debug)
        .format_timestamp_nanos()
        .init();

    spawner.spawn(run()).unwrap();
}

cargo-expand でマクロ展開します。

cargo expand --bin tick
#![feature(prelude_import)]
#![feature(type_alias_impl_trait)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
use embassy_executor::Spawner;
use embassy_time::{Duration, Timer};
use log::*;
#[doc(hidden)]
async fn __run_task() {
    loop {
        {
            let lvl = ::log::Level::Info;
            if lvl <= ::log::STATIC_MAX_LEVEL && lvl <= ::log::max_level() {
                ::log::__private_api_log(
                    format_args!("tick"),
                    lvl,
                    &("tick", "tick", "src/bin/tick.rs", 10u32),
                    ::log::__private_api::Option::None,
                );
            }
        };
        Timer::after(Duration::from_secs(1)).await;
    }
}
fn run() -> ::embassy_executor::SpawnToken<impl Sized> {
    type Fut = impl ::core::future::Future + 'static;
    const POOL_SIZE: usize = 1;
    static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> = ::embassy_executor::raw::TaskPool::new();
    unsafe { POOL._spawn_async_fn(move || __run_task()) }
}
#[doc(hidden)]
async fn ____embassy_main_task(spawner: Spawner) {
    {
        env_logger::builder()
            .filter_level(log::LevelFilter::Debug)
            .format_timestamp_nanos()
            .init();
        spawner.spawn(run()).unwrap();
    }
}
fn __embassy_main(spawner: Spawner) -> ::embassy_executor::SpawnToken<impl Sized> {
    type Fut = impl ::core::future::Future + 'static;
    const POOL_SIZE: usize = 1;
    static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> = ::embassy_executor::raw::TaskPool::new();
    unsafe { POOL._spawn_async_fn(move || ____embassy_main_task(spawner)) }
}
unsafe fn __make_static<T>(t: &mut T) -> &'static mut T {
    ::core::mem::transmute(t)
}
fn main() -> ! {
    let mut executor = ::embassy_executor::Executor::new();
    let executor = unsafe { __make_static(&mut executor) };
    executor
        .run(|spawner| {
            spawner.must_spawn(__embassy_main(spawner));
        })
}

そこまで黒魔術じゃなさそうですが…。 一旦気にせず読んでみます。

#[embassy_executor::main] をつけると、その内容は ____embassy_main_task に rename されて、前段に __embassy_mainmain が挟まっています。 このあたりの main をトランポリンするのは組込み Rust だと頻出のテクニックですね。

で、一番最初の main を見ると executor を作って、 executor を static にして、executor.run() から __embassy_main() を呼び出す、という実装になっています。

fn main() -> ! {
    let mut executor = ::embassy_executor::Executor::new();
    let executor = unsafe { __make_static(&mut executor) };
    executor
        .run(|spawner| {
            spawner.must_spawn(__embassy_main(spawner));
        })
}

async の main としては main から戻ることがないので、main 関数のスコープで作ったローカル変数を static にしてしまっても良い、と。 main のシグニチャmain() -> ! となっているので、executor.run() は無限ループになっているのでしょう。 そのうち読む時のために頭の片隅に置いておきましょう。

あと、spawner の詳細も気になるところですが、一旦おいておきましょう。

で、__embassy_main はこう。 ここで、TAIT 使ってますね。

fn __embassy_main(spawner: Spawner) -> ::embassy_executor::SpawnToken<impl Sized> {
    type Fut = impl ::core::future::Future + 'static;
    const POOL_SIZE: usize = 1;
    static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> = ::embassy_executor::raw::TaskPool::new();
    unsafe { POOL._spawn_async_fn(move || ____embassy_main_task(spawner)) }
}

戻り値の型が ::embassy_executor::SpawnToken<impl Sized> 今の知識じゃわからないですね。 pool を作って、async fn を spawn していますが、このあたりも読んでいかないと、ですね。 embassy-executor/src/raw/mod.rs に TaskPool の実装がありました。 次はこのあたり読んでいきましょう。

ではでは。

Rust の組込み用非同期フレームワーク Embassy (1)

個人ブログをあまりに放置するのも良くないですね! 最近組込み Rust の情報発信もご無沙汰*1なので、自分のモチベーションのためにもちょいちょいメモ書きながら Embassy やっていこうと思います。

リポジトリはこちら。 github.com

Web ページもありますね。

embassy.dev

ということで今日は GitHub の README から。

README.md

はい、 Rust + async は組込みにて最強。そういうことがですね。書いてありますね。 仕組み上、素直に考えれば同じタスク上でステートマシン使って処理を切り替えるので、RTOSコンテキストスイッチより早いですよね、と。

Rust async と RTOS とを比較した記事へのリンクがあります。Rust の async の簡単な説明からありそうなので明日にでも読んでみましょう。

tweedegolf.nl

Batteries included

HAL を提供しているのは

リアルタイム処理用の機能も提供していて、priority の異なる複数の executor を作ることができる。 普通の cooperative な仕組みだとできないからね。

async executor はやることがなくと自動的にコアをスリープする。

Network / Bluetooth もサポートがある。なぜか LoRa も。 embassy-net は smoltcp を使っている。smol も見ないと、と思いつつずっと放置しているので見よう。 Bluetooth は nrf-softdevice と embassy-stm32-wpan とがある。 試してみるときは nrf-softdevice をこっそり試してみよう。

USB ドライバがあるのも良い話だ! ブートローダーがあるのは謎いな。なんでだ。

Sneak peek

ざっくりこういうコードになる、という話。 タイマーの初期化とかは embassy_nrf::init() でやってるのかな?

use defmt::info;
use embassy_executor::Spawner;
use embassy_time::{Duration, Timer};
use embassy_nrf::gpio::{AnyPin, Input, Level, Output, OutputDrive, Pin, Pull};
use embassy_nrf::Peripherals;

// Declare async tasks
#[embassy_executor::task]
async fn blink(pin: AnyPin) {
    let mut led = Output::new(pin, Level::Low, OutputDrive::Standard);

    loop {
        // Timekeeping is globally available, no need to mess with hardware timers.
        led.set_high();
        Timer::after(Duration::from_millis(150)).await;
        led.set_low();
        Timer::after(Duration::from_millis(150)).await;
    }
}

// Main is itself an async task as well.
#[embassy_executor::main]
async fn main(spawner: Spawner) {
    let p = embassy_nrf::init(Default::default());

    // Spawned tasks run in the background, concurrently.
    spawner.spawn(blink(p.P0_13.degrade())).unwrap();

    let mut button = Input::new(p.P0_11, Pull::Up);
    loop {
        // Asynchronously wait for GPIO events, allowing other tasks
        // to run, or the core to sleep.
        button.wait_for_low().await;
        info!("Button pressed!");
        button.wait_for_high().await;
        info!("Button released!");
    }
}

Timer::after()Future を実装した Timer オブジェクトを作ってるだけって感じか。なるほど。

    pub fn after(duration: Duration) -> Self {
        Self {
            expires_at: Instant::now() + duration,
            yielded_once: false,
        }
    }
impl Future for Timer {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.yielded_once && self.expires_at <= Instant::now() {
            Poll::Ready(())
        } else {
            schedule_wake(self.expires_at, cx.waker());
            self.yielded_once = true;
            Poll::Pending
        }
    }
}

Why the name?

EMBedded ASYnc! :)

なるほど!

*1:半年前のインターフェース書いたっきり