Rust の組込み用非同期フレームワーク Embassy (4)

ゆるふわ Embassy コードリーディングシリーズです。 前回 Executor がわかってきた気持ちになったので、もう少し潜ってみましょう。

今回は Spawner いってみます。

コメントや構造体定義を見るに、Spawner は特定の Executor に紐付いていることがわかります。 他スレッドからタスクを作りたい場合は SendSpawner を使え、とあります (こちらは一旦放置) 。

/// Handle to spawn tasks into an executor.
///
/// This Spawner can spawn any task (Send and non-Send ones), but it can
/// only be used in the executor thread (it is not Send itself).
///
/// If you want to spawn tasks from another thread, use [SendSpawner].
#[derive(Copy, Clone)]
pub struct Spawner {
    executor: &'static raw::Executor,
    not_send: PhantomData<*mut ()>,
}

今の実行コンテキストから Spawner を取得するメソッドが定義されています。 こういうのはおもしろいですね。

poll_fn は標準ライブラリの関数で、タスクコンテキストを引数にとるクロージャ (関数) を実行できるみたいですね。 コンテキストからタスクを取り出して、タスクから Executor を取り出して…と順番に辿って、 Spawner を作っています。

Poll::Ready で戻り値返してあげれば、poll_fn() の結果として中身が得られる、と。

    /// Get a Spawner for the current executor.
    ///
    /// This function is `async` just to get access to the current async
    /// context. It returns instantly, it does not block/yield.
    ///
    /// # Panics
    ///
    /// Panics if the current executor is not an Embassy executor.
    pub async fn for_current_executor() -> Self {
        poll_fn(|cx| {
            let task = raw::task_from_waker(cx.waker());
            let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
            let executor = unsafe { raw::Executor::wrap(executor) };
            Poll::Ready(Self::new(executor))
        })
        .await
    }

下の1行だけよくわかってないので、深堀りしてみましょう。

let task = raw::task_from_waker(cx.waker());

うーん!けっこう大変な感じなやつが出てきてしまいましたね。 標準ライブラリの RawWaker の構造を見てみます。

/// Get a task pointer from a waker.
///
/// This can be used as an optimization in wait queues to store task pointers
/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps
/// avoid dynamic dispatch.
///
/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task).
///
/// # Panics
///
/// Panics if the waker is not created by the Embassy executor.
pub fn task_from_waker(waker: &Waker) -> TaskRef {
    // safety: OK because WakerHack has the same layout as Waker.
    // This is not really guaranteed because the structs are `repr(Rust)`, it is
    // indeed the case in the current implementation.
    // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992
    let hack: &WakerHack = unsafe { mem::transmute(waker) };
    if hack.vtable != &VTABLE {
        panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.")
    }

    // safety: our wakers are always created with `TaskRef::as_ptr`
    unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) }
}

struct WakerHack {
    data: *const (),
    vtable: &'static RawWakerVTable,
}

コメントにある通り、WakerHack と同じ構造ですね。 この Executor が使える data 部分に Embassy では TaskRef (TaskHeader) を入れていて、vtable には waker.rs で作っている VTABLE の参照を格納している、と。

/// A `RawWaker` allows the implementor of a task executor to create a [`Waker`]
/// which provides customized wakeup behavior.
///
/// [vtable]: https://en.wikipedia.org/wiki/Virtual_method_table
///
/// It consists of a data pointer and a [virtual function pointer table (vtable)][vtable]
/// that customizes the behavior of the `RawWaker`.
#[derive(PartialEq, Debug)]
#[stable(feature = "futures_api", since = "1.36.0")]
pub struct RawWaker {
    /// A data pointer, which can be used to store arbitrary data as required
    /// by the executor. This could be e.g. a type-erased pointer to an `Arc`
    /// that is associated with the task.
    /// The value of this field gets passed to all functions that are part of
    /// the vtable as the first parameter.
    data: *const (),
    /// Virtual function pointer table that customizes the behavior of this waker.
    vtable: &'static RawWakerVTable,
}

Spawner のおもしろメソッド見ていたらだいぶ横道にそれてしまいました。

肝心の spawn() メソッドは何をやっているのかと言うと…。 SpawnToken を引数で受け取って、Executor::spanw() を呼び出しています。

    /// Spawn a task into an executor.
    ///
    /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`).
    pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
        let task = token.raw_task;
        mem::forget(token);

        match task {
            Some(task) => {
                unsafe { self.executor.spawn(task) };
                Ok(())
            }
            None => Err(SpawnError::Busy),
        }
    }

SpawnToken が何かと言うと、基本 TaskRef を持っているだけっぽいです。 #[embassy_executor::task] を付けた関数を呼び出すと、その戻り値は SpawnTaken になっていて、それを Executor で実行する想定のようです。

pub struct SpawnToken<S> {
    raw_task: Option<raw::TaskRef>,
    phantom: PhantomData<*mut S>,
}

そう言えば、#[embassy_executor::task] をマクロ展開すると、こんな感じでした。

fn run() -> ::embassy_executor::SpawnToken<impl Sized> {
    type Fut = impl ::core::future::Future + 'static;
    const POOL_SIZE: usize = 1;
    static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> = ::embassy_executor::raw::TaskPool::new();
    unsafe { POOL._spawn_async_fn(move || __run_task()) }
}

このあたりが、 Future から TaskRef を作っているのですけど、ちょっと長くなりそうなので、次回にしましょうかね…。

ちょっと戻って、Executor::spawn()SyncExecutor::spanw() を呼び出すようになっていて、タスクの Executor に自分をセットして、 RunQeueue にタスクを入れておしまい!と。

    pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
        task.header().executor.set(Some(self));

        #[cfg(feature = "rtos-trace")]
        trace::task_new(task.as_ptr() as u32);

        self.enqueue(task);
    }

ここまでわかれば、spawn 周りは、タスクの構造がわかれば、おおよそ理解したと言って良い気がします。 ので、次回は、タスクの構造見ていくのをやりましょう。