Rust の組込み用非同期フレームワーク Embassy (7)
自分のためだけに Embassy コードリーディングシリーズです。 nRF52840 向けの Waker を wake している実装を見ていきます。
GPIO 割り込みから wake()
するのが、おそらく最も単純な実装でしょう、ということで GPIO の example を探します。
examples/nrf52840/src/bin/gpiote_port.rs
中身は下のような感じです。
Task Pool が 4 になっていて、同じタスクを複数 spawn できるようになっているようですが、そこは一旦無視しましょう。
pin.wait_for_low()
を見るのが良さそうです。
#[embassy_executor::task(pool_size = 4)] async fn button_task(n: usize, mut pin: Input<'static, AnyPin>) { loop { pin.wait_for_low().await; info!("Button {:?} pressed!", n); pin.wait_for_high().await; info!("Button {:?} released!", n); } } #[embassy_executor::main] async fn main(spawner: Spawner) { let p = embassy_nrf::init(Default::default()); info!("Starting!"); let btn1 = Input::new(p.P0_11.degrade(), Pull::Up); let btn2 = Input::new(p.P0_12.degrade(), Pull::Up); let btn3 = Input::new(p.P0_24.degrade(), Pull::Up); let btn4 = Input::new(p.P0_25.degrade(), Pull::Up); unwrap!(spawner.spawn(button_task(1, btn1))); unwrap!(spawner.spawn(button_task(2, btn2))); unwrap!(spawner.spawn(button_task(3, btn3))); unwrap!(spawner.spawn(button_task(4, btn4))); }
Input
は Flex
のラッパーになっています。
wait_for_low()
を呼び出すと、Flex
の wait_for_low()
を呼びます。
/// GPIO input driver. pub struct Input<'d, T: Pin> { pub(crate) pin: Flex<'d, T>, } impl<'d, T: GpioPin> Input<'d, T> { /// Wait until the pin is low. If it is already low, return immediately. pub async fn wait_for_low(&mut self) { self.pin.wait_for_low().await }
ちなみに Flex
は Flexible pin のことで、組込み Rust では GPIO ピンの状態を型として表現することが多いのですが、どのような状態も取れるピンとして定義されています。
Flex::wait_for_low()
では、GPIO 入力が low になったら割り込みが入るように設定して、PortInputFuture
のインスタンスを作って await しています。
impl<'d, T: GpioPin> Flex<'d, T> { /// Wait until the pin is low. If it is already low, return immediately. pub async fn wait_for_low(&mut self) { self.pin.conf().modify(|_, w| w.sense().low()); PortInputFuture::new(&mut self.pin).await }
重要なのは Future
トレイトを実装している PortInputFuture
ですね。
poll
の実装は次のようになっています。
PORT ごとに Waker を持っていて、register()
で登録するようです。
最後は、GPIO 割り込みが検出されて SENSE が無効になっていたら、Poll::Ready(())
を返しています。
impl<'a> Future for PortInputFuture<'a> { type Output = (); fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { PORT_WAKERS[self.pin.pin_port() as usize].register(cx.waker()); if self.pin.conf().read().sense().is_disabled() { Poll::Ready(()) } else { Poll::Pending } } }
register()
の部分ですが、PORT_WAKERS
は AtomicWaker
の static な配列になっており、AtomicWaker
がこうなので、割り込み待ちしている GPIO ピンの Waker を wake したら、このときに登録したコンテキストが run queue に入る、ということになります。
/// Utility struct to register and wake a waker. pub struct AtomicWaker { waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, } impl AtomicWaker { /// Register a waker. Overwrites the previous waker, if any. pub fn register(&self, w: &Waker) { critical_section::with(|cs| { let cell = self.waker.borrow(cs); cell.set(match cell.replace(None) { Some(w2) if (w2.will_wake(w)) => Some(w2), _ => Some(w.clone()), }) }) } /// Wake the registered waker, if any. pub fn wake(&self) { critical_section::with(|cs| { let cell = self.waker.borrow(cs); if let Some(w) = cell.replace(None) { w.wake_by_ref(); cell.set(Some(w)); } }) } }
では続いて、GPIO の割り込みハンドラ embassy-nrf/src/gpiote.rs です。
重要なところはコード中にコメントで補足した、wake()
しているところです。
この GPIO 割り込みの中で、wait_for_low()
している実行コンテキストを run queue に入れて、WFE
から復帰すると、無事 wait_for_low()
の続きからプログラムが実行されることになります。
// 抜粋 unsafe fn handle_gpiote_interrupt() { let g: &pac::gpiote::RegisterBlock = regs(); if g.events_port.read().bits() != 0 { g.events_port.write(|w| w); let ports = &[&*pac::P0::ptr(), &*pac::P1::ptr()]; for (port, &p) in ports.iter().enumerate() { let bits = p.latch.read().bits(); for pin in BitIter(bits) { // ここで Waker を wake() している p.pin_cnf[pin as usize].modify(|_, w| w.sense().disabled()); PORT_WAKERS[port * 32 + pin as usize].wake(); } p.latch.write(|w| w.bits(bits)); } } }
ということで、 embassy 自体には他にも機能がありそうですが、最もベースとなる部分の仕組みは大体わかった気になれましたね。
Rust の組込み用非同期フレームワーク Embassy (6)
誰得 Embassy コードリーディングシリーズです。 Spawner 周りを見終わったので、次は Waker いってみましょう。
Waker はおそらく wake 呼び出す側から見た方がわかりやすいでしょう。 std で動く example の中だと serial.rs でしょうか。
use async_io::Async; use embassy_executor::Executor; use embedded_io::asynch::Read; use log::*; use nix::sys::termios; use static_cell::StaticCell; use self::serial_port::SerialPort; #[embassy_executor::task] async fn run() { // Open the serial port. let baudrate = termios::BaudRate::B115200; let port = SerialPort::new("/dev/ttyACM0", baudrate).unwrap(); //let port = Spy::new(port); // Use async_io's reactor for async IO. // This demonstrates how embassy's executor can drive futures from another IO library. // Essentially, async_io::Async converts from AsRawFd+Read+Write to futures's AsyncRead+AsyncWrite let port = Async::new(port).unwrap(); // We can then use FromStdIo to convert from futures's AsyncRead+AsyncWrite // to embedded_io's async Read+Write. // // This is not really needed, you could write the code below using futures::io directly. // It's useful if you want to have portable code across embedded and std. let mut port = embedded_io::adapters::FromFutures::new(port); info!("Serial opened!"); loop { let mut buf = [0u8; 256]; let n = port.read(&mut buf).await.unwrap(); info!("read {:?}", &buf[..n]); } }
と思ったら embassy 用のアダプター層があるせいでちょっと大変ですね。
SerialPort
が Async<T>
でラップされていて (この時点では futures-io
に対応)、さらに embedded-io のアダプターが適用されています、と。
embedded-io のアダプターは FromFutures<T>
で、read()
を呼ぶと、poll_fn()
の中で、アダプターの中身の poll_read()
を呼びます、と。
/// Adapter from `futures::io` traits. pub struct FromFutures<T: ?Sized> { inner: T, } impl<T: futures::io::AsyncRead + Unpin + ?Sized> crate::asynch::Read for FromFutures<T> { async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { poll_fn(|cx| Pin::new(&mut self.inner).poll_read(cx, buf)).await } }
poll_read()
はどうなっているかと言うと、さらに中身から read()
して、io::ErrorKind::WouldBlock
かそれ以外かで戻り値が違ってきます。
データが読み込めたら Poll::Ready
でそのデータが返るのと、WouldBlock 以外のエラーでも Poll::Ready
でエラーが返りそうですね。
ready!
マクロが何かというと?
impl<T: Read> AsyncRead for Async<T> { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { loop { match (*self).get_mut().read(buf) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} res => return Poll::Ready(res), } ready!(self.poll_readable(cx))?; } }
poll_readable()
が返す Poll
を unwrap するか Pending を返すようになっています。
1つ上の match が Poll::Ready
か io エラーしか返さないので、ここで Pending を返しているだけ、という感じですかね。
/// Unwraps `Poll<T>` or returns [`Pending`][`core::task::Poll::Pending`]. #[macro_export] macro_rules! ready { ($e:expr $(,)?) => { match $e { core::task::Poll::Ready(t) => t, core::task::Poll::Pending => return core::task::Poll::Pending, } }; }
Async<T>::poll_readable()
見ると io エラー以外は意味のある値が返らなさそうなので、ready!
マクロの core::task::Poll::Ready(t) => t
は io エラー、それ以外だと Poll::Pending
が返りそうです。
impl<T> Async<T> { // ... pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { self.source.poll_readable(cx) }
poll_readable()
をもう少し下っていくと async_io の Source::poll_readable()
-> poll_ready()
になります、と。
で、ここで Waker を登録するようです。
impl Source { /// Polls the I/O source for readability. pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { self.poll_ready(READ, cx) } // ... /// Registers a waker from `poll_readable()` or `poll_writable()`. /// /// If a different waker is already registered, it gets replaced and woken. fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> { let mut state = self.state.lock().unwrap(); // Check if the reactor has delivered an event. if let Some((a, b)) = state[dir].ticks { // If `state[dir].tick` has changed to a value other than the old reactor tick, // that means a newer reactor tick has delivered an event. if state[dir].tick != a && state[dir].tick != b { state[dir].ticks = None; return Poll::Ready(Ok(())); } } let was_empty = state[dir].is_empty(); // Register the current task's waker. if let Some(w) = state[dir].waker.take() { if w.will_wake(cx.waker()) { state[dir].waker = Some(w); return Poll::Pending; } // Wake the previous waker because it's going to get replaced. panic::catch_unwind(|| w.wake()).ok(); } state[dir].waker = Some(cx.waker().clone()); state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick)); // Update interest in this I/O handle. if was_empty { Reactor::get().poller.modify( self.raw, Event { key: self.key, readable: !state[READ].is_empty(), writable: !state[WRITE].is_empty(), }, )?; } Poll::Pending }
Waker を clone するか (state[dir].waker = Some(cx.waker().clone());
) すでに Waker が登録されていたら即 Pending を返しているみたいです。
あとは、state[dir].waker
の wake()
を呼び出すところを探せば良さそうです。
async-io の ReactorLock::react
の中でやっていそうですね。
impl ReactorLock<'_> { /// Processes new events, blocking until the first event or the timeout. pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> { let mut wakers = Vec::new(); // ... // Block on I/O events. let res = match self.reactor.poller.wait(&mut self.events, timeout) { // No I/O events occurred. Ok(0) => { if timeout != Some(Duration::from_secs(0)) { // The non-zero timeout was hit so fire ready timers. self.reactor.process_timers(&mut wakers); } Ok(()) } // At least one I/O event occurred. Ok(_) => { // Iterate over sources in the event list. let sources = self.reactor.sources.lock().unwrap(); for ev in self.events.iter() { // Check if there is a source in the table with this key. if let Some(source) = sources.get(ev.key) { let mut state = source.state.lock().unwrap(); // Collect wakers if a writability event was emitted. for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] { if emitted { state[dir].tick = tick; state[dir].drain_into(&mut wakers); } } // ... } } Ok(()) } // ... }; // Wake up ready tasks. log::trace!("react: {} ready wakers", wakers.len()); for waker in wakers { // Don't let a panicking waker blow everything up. panic::catch_unwind(|| waker.wake()).ok(); } res } }
self.reactor.poller.wait(&mut self.events, timeout)
の中を見ていくと epoll_wait
呼んでいたので、大体予想通りですね。
これが、マイコンターゲットだと割り込み待ちで、Waker::wake()
を呼ぶのでしょう。
nRF あたりの割り込み待ち実装を次回眺めてから、Waker::wake()
の中身を追いかければ、大体コアの部分は理解できそうですね。
Rust の組込み用非同期フレームワーク Embassy (5)
ゆるふわ Embassy コードリーディングシリーズです。 前回 spawn 周りはあとタスクの構造だけ、というところまで来たのでタスクの構造を見てみましょう。
タスク関連の構造体としては
- TaskHeader
- TaskStorage
- TaskRef
があります。このへん で定義されています。
TaskHeader は次のように定義されています。 日本語コメントは私の注釈です。
/// Raw task header for use in task pointers. pub(crate) struct TaskHeader { // spawn されたかどうかの状態 pub(crate) state: AtomicU32, // RunQueue に入っている次の TaskHeader への参照 pub(crate) run_queue_item: RunQueueItem, // タスクを実行する Executor への参照 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, // poll 時に呼び出す関数ポインタ。実体は `TaskStorage::poll` poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, }
TaskStrorage は TaskHeader + Future です。
TaskStorage::poll()
が Future::poll()
を呼び出しています。
#[repr(C)] pub struct TaskStorage<F: Future + 'static> { raw: TaskHeader, future: UninitCell<F>, // Valid if STATE_SPAWNED } impl<F: Future + 'static> TaskStorage<F> { const NEW: Self = Self::new(); // ... unsafe fn poll(p: TaskRef) { let this = &*(p.as_ptr() as *const TaskStorage<F>); let future = Pin::new_unchecked(this.future.as_mut()); let waker = waker::from_task(p); let mut cx = Context::from_waker(&waker); match future.poll(&mut cx) { Poll::Ready(_) => { this.future.drop_in_place(); this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); #[cfg(feature = "integrated-timers")] this.raw.expires_at.set(Instant::MAX); } Poll::Pending => {} } // the compiler is emitting a virtual call for waker drop, but we know // it's a noop for our waker. mem::forget(waker); }
poll()
のパスを辿ると、こんな感じになりそうです。
Executor(SyncExecutor)::poll()
TaskHeader::poll_fn()
(実体はTaskStorage::poll()
Future::poll()
TaskRef
は TaskHeader
の参照を持っていますが、コメントにある通り、実質は TaskStorage
への参照です (ただし Future
は型は消えています) 。
/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. #[derive(Clone, Copy)] pub struct TaskRef { ptr: NonNull<TaskHeader>, } impl TaskRef { fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { Self { ptr: NonNull::from(task).cast(), } } /// Safety: The pointer must have been obtained with `Task::as_ptr` pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self { Self { ptr: NonNull::new_unchecked(ptr as *mut TaskHeader), } } pub(crate) fn header(self) -> &'static TaskHeader { unsafe { self.ptr.as_ref() } } /// The returned pointer is valid for the entire TaskStorage. pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() } }
一時、TaskStorage::poll()
で Poll::Ready
のときの値捨てちゃってるけど、async 関数の戻り値ある場合はどうするのでしょう?
と思ったりしましたが、これはあくまでも、embassy_executor の Task を実行しているだけで、embassy_executor の Task は戻り値を持たないのでしょう、という理解に到達しました。
examples/std/src/bin/serial.rs
では Task の中で、async 関数を呼んで、シリアルから読み取ったデータを戻り値として受け取っています。
#[embassy_executor::task] async fn run() { // Open the serial port. let baudrate = termios::BaudRate::B115200; let port = SerialPort::new("/dev/ttyACM0", baudrate).unwrap(); //let port = Spy::new(port); // Use async_io's reactor for async IO. // This demonstrates how embassy's executor can drive futures from another IO library. // Essentially, async_io::Async converts from AsRawFd+Read+Write to futures's AsyncRead+AsyncWrite let port = Async::new(port).unwrap(); // We can then use FromStdIo to convert from futures's AsyncRead+AsyncWrite // to embedded_io's async Read+Write. // // This is not really needed, you could write the code below using futures::io directly. // It's useful if you want to have portable code across embedded and std. let mut port = embedded_io::adapters::FromFutures::new(port); info!("Serial opened!"); loop { let mut buf = [0u8; 256]; let n = port.read(&mut buf).await.unwrap(); info!("read {:?}", &buf[..n]); } }
Future についてはこちらのブログがとてもよくまとまっていて、助かりました。
次は Waker 見ていきましょうね。
Rust の組込み用非同期フレームワーク Embassy (4)
ゆるふわ Embassy コードリーディングシリーズです。 前回 Executor がわかってきた気持ちになったので、もう少し潜ってみましょう。
今回は Spawner いってみます。
コメントや構造体定義を見るに、Spawner
は特定の Executor
に紐付いていることがわかります。
他スレッドからタスクを作りたい場合は SendSpawner
を使え、とあります (こちらは一旦放置) 。
/// Handle to spawn tasks into an executor. /// /// This Spawner can spawn any task (Send and non-Send ones), but it can /// only be used in the executor thread (it is not Send itself). /// /// If you want to spawn tasks from another thread, use [SendSpawner]. #[derive(Copy, Clone)] pub struct Spawner { executor: &'static raw::Executor, not_send: PhantomData<*mut ()>, }
今の実行コンテキストから Spawner
を取得するメソッドが定義されています。
こういうのはおもしろいですね。
poll_fn は標準ライブラリの関数で、タスクコンテキストを引数にとるクロージャ (関数) を実行できるみたいですね。 コンテキストからタスクを取り出して、タスクから Executor を取り出して…と順番に辿って、 Spawner を作っています。
Poll::Ready
で戻り値返してあげれば、poll_fn()
の結果として中身が得られる、と。
/// Get a Spawner for the current executor. /// /// This function is `async` just to get access to the current async /// context. It returns instantly, it does not block/yield. /// /// # Panics /// /// Panics if the current executor is not an Embassy executor. pub async fn for_current_executor() -> Self { poll_fn(|cx| { let task = raw::task_from_waker(cx.waker()); let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; let executor = unsafe { raw::Executor::wrap(executor) }; Poll::Ready(Self::new(executor)) }) .await }
下の1行だけよくわかってないので、深堀りしてみましょう。
let task = raw::task_from_waker(cx.waker());
うーん!けっこう大変な感じなやつが出てきてしまいましたね。
標準ライブラリの RawWaker
の構造を見てみます。
/// Get a task pointer from a waker. /// /// This can be used as an optimization in wait queues to store task pointers /// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps /// avoid dynamic dispatch. /// /// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). /// /// # Panics /// /// Panics if the waker is not created by the Embassy executor. pub fn task_from_waker(waker: &Waker) -> TaskRef { // safety: OK because WakerHack has the same layout as Waker. // This is not really guaranteed because the structs are `repr(Rust)`, it is // indeed the case in the current implementation. // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992 let hack: &WakerHack = unsafe { mem::transmute(waker) }; if hack.vtable != &VTABLE { panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") } // safety: our wakers are always created with `TaskRef::as_ptr` unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) } } struct WakerHack { data: *const (), vtable: &'static RawWakerVTable, }
コメントにある通り、WakerHack
と同じ構造ですね。
この Executor
が使える data
部分に Embassy では TaskRef
(TaskHeader
) を入れていて、vtable
には waker.rs で作っている VTABLE
の参照を格納している、と。
/// A `RawWaker` allows the implementor of a task executor to create a [`Waker`] /// which provides customized wakeup behavior. /// /// [vtable]: https://en.wikipedia.org/wiki/Virtual_method_table /// /// It consists of a data pointer and a [virtual function pointer table (vtable)][vtable] /// that customizes the behavior of the `RawWaker`. #[derive(PartialEq, Debug)] #[stable(feature = "futures_api", since = "1.36.0")] pub struct RawWaker { /// A data pointer, which can be used to store arbitrary data as required /// by the executor. This could be e.g. a type-erased pointer to an `Arc` /// that is associated with the task. /// The value of this field gets passed to all functions that are part of /// the vtable as the first parameter. data: *const (), /// Virtual function pointer table that customizes the behavior of this waker. vtable: &'static RawWakerVTable, }
Spawner
のおもしろメソッド見ていたらだいぶ横道にそれてしまいました。
肝心の spawn()
メソッドは何をやっているのかと言うと…。
SpawnToken
を引数で受け取って、Executor::spanw()
を呼び出しています。
/// Spawn a task into an executor. /// /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`). pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> { let task = token.raw_task; mem::forget(token); match task { Some(task) => { unsafe { self.executor.spawn(task) }; Ok(()) } None => Err(SpawnError::Busy), } }
SpawnToken
が何かと言うと、基本 TaskRef
を持っているだけっぽいです。
#[embassy_executor::task]
を付けた関数を呼び出すと、その戻り値は SpawnTaken
になっていて、それを Executor
で実行する想定のようです。
pub struct SpawnToken<S> { raw_task: Option<raw::TaskRef>, phantom: PhantomData<*mut S>, }
そう言えば、#[embassy_executor::task]
をマクロ展開すると、こんな感じでした。
fn run() -> ::embassy_executor::SpawnToken<impl Sized> { type Fut = impl ::core::future::Future + 'static; const POOL_SIZE: usize = 1; static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> = ::embassy_executor::raw::TaskPool::new(); unsafe { POOL._spawn_async_fn(move || __run_task()) } }
このあたりが、 Future
から TaskRef
を作っているのですけど、ちょっと長くなりそうなので、次回にしましょうかね…。
ちょっと戻って、Executor::spawn()
は SyncExecutor::spanw()
を呼び出すようになっていて、タスクの Executor に自分をセットして、 RunQeueue
にタスクを入れておしまい!と。
pub(super) unsafe fn spawn(&'static self, task: TaskRef) { task.header().executor.set(Some(self)); #[cfg(feature = "rtos-trace")] trace::task_new(task.as_ptr() as u32); self.enqueue(task); }
ここまでわかれば、spawn 周りは、タスクの構造がわかれば、おおよそ理解したと言って良い気がします。 ので、次回は、タスクの構造見ていくのをやりましょう。
Rust の組込み用非同期フレームワーク Embassy (3)
組込み向けの async/await フレームワーク embassy をゆるーく眺めていくシリーズです。 前回は main についているマクロがなにやってるか見てみました。
そして個人的には、 Spawner とか Executor がどうなってるねん!が気になるので、そこ突っ込んでいきましょう。
ということで embassy-executor を見ていきます。
アーキテクチャ固有の実装はかなりコンパクトになっています。 とりあえず Cortex-M の実装 embassy-executor/src/arch/cortex_m.rs でも見てみましょう。
executor-thread
と executor-interrupt
があるのですが、一旦 executor-thread
に絞って読んでいきます。
/// Thread mode executor, using WFE/SEV.
とあるとおり、Cortex-M の WFE 命令と SEV 命令を使って、スリープ / スリープからの復帰をする Executor のようです。
SEV 命令を実行する ThreadPender
構造体が用意されています。
トレイトになっていないのですね?
#[derive(Copy, Clone)] pub(crate) struct ThreadPender; impl ThreadPender { pub(crate) fn pend(self) { unsafe { core::arch::asm!("sev") } } }
上位側で arch 固有の実装がないとダメになっているし、crate 内にアーキテクチャ固有の実装を全部置いているから、トレイトで縛らなくて良い、って感じなのですかね?
pend()
だけあれば良いだけですし、そういう選択肢もありですね。
#[derive(Clone, Copy)] pub(crate) enum PenderInner { #[cfg(feature = "executor-thread")] Thread(crate::arch::ThreadPender), #[cfg(feature = "executor-interrupt")] Interrupt(crate::arch::InterruptPender), #[cfg(feature = "pender-callback")] Callback { func: fn(*mut ()), context: *mut () }, }
Executor
も同様に、トレイト使わずに各実装でインターフェース合わせているっぽいですね ((wasm 向けの実装だけ、run
がなくて start
になっているけど、これで良いのか…?)) 。
これで実装全部です。
/// Thread mode executor, using WFE/SEV. /// /// This is the simplest and most common kind of executor. It runs on /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction /// is executed, to make the `WFE` exit from sleep and poll the task. /// /// This executor allows for ultra low power consumption for chips where `WFE` /// triggers low-power sleep without extra steps. If your chip requires extra steps, /// you may use [`raw::Executor`] directly to program custom behavior. pub struct Executor { inner: raw::Executor, not_send: PhantomData<*mut ()>, } impl Executor { /// Create a new Executor. pub fn new() -> Self { Self { inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), not_send: PhantomData, } } /// Run the executor. /// /// The `init` closure is called with a [`Spawner`] that spawns tasks on /// this executor. Use it to spawn the initial task(s). After `init` returns, /// the executor starts running the tasks. /// /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), /// for example by passing it as an argument to the initial tasks. /// /// This function requires `&'static mut self`. This means you have to store the /// Executor instance in a place where it'll live forever and grants you mutable /// access. There's a few ways to do this: /// /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) /// - a `static mut` (unsafe) /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) /// /// This function never returns. pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { init(self.inner.spawner()); loop { unsafe { self.inner.poll(); asm!("wfe"); }; } } }
run
の内容も、 Spawner
初期化して無限ループで、 poll()
し終わったら、 WFE 命令で寝る!以上!です。
割り込み受けたり、SVE 命令受けるともう1回 poll()
して〜、となるわけですね。
ちなみに poll()
の中では RunQueue
という実行可能なタスクが積まれている Queue があって、これが空になるまで Task を実行するようになっています。
つまり、poll()
から抜けてくると、Executor としては、実行しないといけない async タスクが存在しない状態になっているわけですね。
シンプル!
poll()
の実体は、raw::SyncExecutor が持っています。
ちょっと Timer の実装が邪魔なのですけど、それを取り除いちゃうとこれだけです。
RunQueue
からタスクを1個ずつ取り出して、実行しているだけですね。
/// # Safety /// /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { #[allow(clippy::never_loop)] loop { self.run_queue.dequeue_all(|p| { let task = p.header(); let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); if state & STATE_SPAWNED == 0 { // If task is not running, ignore it. This can happen in the following scenario: // - Task gets dequeued, poll starts // - While task is being polled, it gets woken. It gets placed in the queue. // - Task poll finishes, returning done=true // - RUNNING bit is cleared, but the task is already in the queue. return; } // Run the task task.poll_fn.get().unwrap_unchecked()(p); }); } }
ここまで来れば、あとは
- Spawner が Executor にタスクを追加する流れ
- Waker から pend 呼び出すまでの流れ
あたりがわかれば大まかな仕組みが理解できそうです。
Waker は Rust の core (std) 経由するのは知っているので、 Spawner の方が楽に読めそうでしょうか? 次回は Spawner 読んでみましょう。
Rust の組込み用非同期フレームワーク Embassy (2)
Embassy をゆるーく眺めるシリーズです。 前回 README を見てみて Timer どうなっているのか気になりました。 なので実装をおいかけてみましょう。
アプリコードは大体こんな感じでした。 さてどこから行きましょうかね?
use embassy_executor::Spawner; use embassy_time::{Duration, Timer}; #[embassy_executor::task] async fn blink(pin: AnyPin) { let mut led = Output::new(pin, Level::Low, OutputDrive::Standard); loop { // Timekeeping is globally available, no need to mess with hardware timers. led.set_high(); Timer::after(Duration::from_millis(150)).await; led.set_low(); Timer::after(Duration::from_millis(150)).await; } } #[embassy_executor::main] async fn main(spawner: Spawner) { let p = embassy_nrf::init(Default::default()); // Spawned tasks run in the background, concurrently. spawner.spawn(blink(p.P0_13.degrade())).unwrap(); }
気になるところ片っ端からいってみましょう。
まずは main
についてるマクロ!
#[embassy_executor::main]
embassy-macros が中身っぽいですね。
embassy-macros
README を見ます。
The
task
andmain
macros require the type alias impl trait (TAIT) nightly feature in order to compile.
なるほど? type alias impl trait ? これか。
type alias つくるときに impl Trait
が指定できるように、ということのようですね。
確かにこれは便利そうです。
type Foo = impl Bar;
何がブロッカーなのかと言うと…?けっこう色々 TODO 残ってますね。 もうちょっと時間かかりそうです。残念。
マクロの展開結果
とりあえずマクロの実装いきなり見る前に軽く展開結果を見ておきましょう。cargo-expand
があれば…!
cargo install cargo-expand
あとは、 examples/std
の下のホストでビルドできるバイナリを指定してみます。
examles/std/bin/tick.rs 小さくてちょうど良いのがありました。
#![feature(type_alias_impl_trait)] use embassy_executor::Spawner; use embassy_time::{Duration, Timer}; use log::*; #[embassy_executor::task] async fn run() { loop { info!("tick"); Timer::after(Duration::from_secs(1)).await; } } #[embassy_executor::main] async fn main(spawner: Spawner) { env_logger::builder() .filter_level(log::LevelFilter::Debug) .format_timestamp_nanos() .init(); spawner.spawn(run()).unwrap(); }
cargo-expand でマクロ展開します。
cargo expand --bin tick
#![feature(prelude_import)] #![feature(type_alias_impl_trait)] #[prelude_import] use std::prelude::rust_2021::*; #[macro_use] extern crate std; use embassy_executor::Spawner; use embassy_time::{Duration, Timer}; use log::*; #[doc(hidden)] async fn __run_task() { loop { { let lvl = ::log::Level::Info; if lvl <= ::log::STATIC_MAX_LEVEL && lvl <= ::log::max_level() { ::log::__private_api_log( format_args!("tick"), lvl, &("tick", "tick", "src/bin/tick.rs", 10u32), ::log::__private_api::Option::None, ); } }; Timer::after(Duration::from_secs(1)).await; } } fn run() -> ::embassy_executor::SpawnToken<impl Sized> { type Fut = impl ::core::future::Future + 'static; const POOL_SIZE: usize = 1; static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> = ::embassy_executor::raw::TaskPool::new(); unsafe { POOL._spawn_async_fn(move || __run_task()) } } #[doc(hidden)] async fn ____embassy_main_task(spawner: Spawner) { { env_logger::builder() .filter_level(log::LevelFilter::Debug) .format_timestamp_nanos() .init(); spawner.spawn(run()).unwrap(); } } fn __embassy_main(spawner: Spawner) -> ::embassy_executor::SpawnToken<impl Sized> { type Fut = impl ::core::future::Future + 'static; const POOL_SIZE: usize = 1; static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> = ::embassy_executor::raw::TaskPool::new(); unsafe { POOL._spawn_async_fn(move || ____embassy_main_task(spawner)) } } unsafe fn __make_static<T>(t: &mut T) -> &'static mut T { ::core::mem::transmute(t) } fn main() -> ! { let mut executor = ::embassy_executor::Executor::new(); let executor = unsafe { __make_static(&mut executor) }; executor .run(|spawner| { spawner.must_spawn(__embassy_main(spawner)); }) }
そこまで黒魔術じゃなさそうですが…。 一旦気にせず読んでみます。
#[embassy_executor::main]
をつけると、その内容は ____embassy_main_task
に rename されて、前段に __embassy_main
と main
が挟まっています。
このあたりの main をトランポリンするのは組込み Rust だと頻出のテクニックですね。
で、一番最初の main
を見ると executor
を作って、 executor
を static にして、executor.run()
から __embassy_main()
を呼び出す、という実装になっています。
fn main() -> ! { let mut executor = ::embassy_executor::Executor::new(); let executor = unsafe { __make_static(&mut executor) }; executor .run(|spawner| { spawner.must_spawn(__embassy_main(spawner)); }) }
async の main としては main から戻ることがないので、main 関数のスコープで作ったローカル変数を static にしてしまっても良い、と。
main のシグニチャが main() -> !
となっているので、executor.run()
は無限ループになっているのでしょう。
そのうち読む時のために頭の片隅に置いておきましょう。
あと、spawner
の詳細も気になるところですが、一旦おいておきましょう。
で、__embassy_main
はこう。
ここで、TAIT 使ってますね。
fn __embassy_main(spawner: Spawner) -> ::embassy_executor::SpawnToken<impl Sized> { type Fut = impl ::core::future::Future + 'static; const POOL_SIZE: usize = 1; static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> = ::embassy_executor::raw::TaskPool::new(); unsafe { POOL._spawn_async_fn(move || ____embassy_main_task(spawner)) } }
戻り値の型が ::embassy_executor::SpawnToken<impl Sized>
今の知識じゃわからないですね。
pool を作って、async fn を spawn していますが、このあたりも読んでいかないと、ですね。
embassy-executor/src/raw/mod.rs
に TaskPool の実装がありました。
次はこのあたり読んでいきましょう。
ではでは。
Rust の組込み用非同期フレームワーク Embassy (1)
個人ブログをあまりに放置するのも良くないですね! 最近組込み Rust の情報発信もご無沙汰*1なので、自分のモチベーションのためにもちょいちょいメモ書きながら Embassy やっていこうと思います。
リポジトリはこちら。 github.com
Web ページもありますね。
ということで今日は GitHub の README から。
README.md
はい、 Rust + async は組込みにて最強。そういうことがですね。書いてありますね。 仕組み上、素直に考えれば同じタスク上でステートマシン使って処理を切り替えるので、RTOS のコンテキストスイッチより早いですよね、と。
Rust async と RTOS とを比較した記事へのリンクがあります。Rust の async の簡単な説明からありそうなので明日にでも読んでみましょう。
Batteries included
HAL を提供しているのは
- STM32
- nRF
- Raspberry Pi RP2040
- ESP32 (esp-rs の方)
リアルタイム処理用の機能も提供していて、priority の異なる複数の executor を作ることができる。 普通の cooperative な仕組みだとできないからね。
async executor はやることがなくと自動的にコアをスリープする。
Network / Bluetooth もサポートがある。なぜか LoRa も。 embassy-net は smoltcp を使っている。smol も見ないと、と思いつつずっと放置しているので見よう。 Bluetooth は nrf-softdevice と embassy-stm32-wpan とがある。 試してみるときは nrf-softdevice をこっそり試してみよう。
USB ドライバがあるのも良い話だ! ブートローダーがあるのは謎いな。なんでだ。
Sneak peek
ざっくりこういうコードになる、という話。
タイマーの初期化とかは embassy_nrf::init()
でやってるのかな?
use defmt::info; use embassy_executor::Spawner; use embassy_time::{Duration, Timer}; use embassy_nrf::gpio::{AnyPin, Input, Level, Output, OutputDrive, Pin, Pull}; use embassy_nrf::Peripherals; // Declare async tasks #[embassy_executor::task] async fn blink(pin: AnyPin) { let mut led = Output::new(pin, Level::Low, OutputDrive::Standard); loop { // Timekeeping is globally available, no need to mess with hardware timers. led.set_high(); Timer::after(Duration::from_millis(150)).await; led.set_low(); Timer::after(Duration::from_millis(150)).await; } } // Main is itself an async task as well. #[embassy_executor::main] async fn main(spawner: Spawner) { let p = embassy_nrf::init(Default::default()); // Spawned tasks run in the background, concurrently. spawner.spawn(blink(p.P0_13.degrade())).unwrap(); let mut button = Input::new(p.P0_11, Pull::Up); loop { // Asynchronously wait for GPIO events, allowing other tasks // to run, or the core to sleep. button.wait_for_low().await; info!("Button pressed!"); button.wait_for_high().await; info!("Button released!"); } }
Timer::after()
は Future
を実装した Timer オブジェクトを作ってるだけって感じか。なるほど。
pub fn after(duration: Duration) -> Self { Self { expires_at: Instant::now() + duration, yielded_once: false, } }
impl Future for Timer { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { if self.yielded_once && self.expires_at <= Instant::now() { Poll::Ready(()) } else { schedule_wake(self.expires_at, cx.waker()); self.yielded_once = true; Poll::Pending } } }
Why the name?
EMBedded ASYnc! :)
なるほど!
*1:半年前のインターフェース書いたっきり