Rust の組込み用非同期フレームワーク Embassy (6)

誰得 Embassy コードリーディングシリーズです。 Spawner 周りを見終わったので、次は Waker いってみましょう。

Waker はおそらく wake 呼び出す側から見た方がわかりやすいでしょう。 std で動く example の中だと serial.rs でしょうか。

use async_io::Async;
use embassy_executor::Executor;
use embedded_io::asynch::Read;
use log::*;
use nix::sys::termios;
use static_cell::StaticCell;

use self::serial_port::SerialPort;

#[embassy_executor::task]
async fn run() {
    // Open the serial port.
    let baudrate = termios::BaudRate::B115200;
    let port = SerialPort::new("/dev/ttyACM0", baudrate).unwrap();
    //let port = Spy::new(port);

    // Use async_io's reactor for async IO.
    // This demonstrates how embassy's executor can drive futures from another IO library.
    // Essentially, async_io::Async converts from AsRawFd+Read+Write to futures's AsyncRead+AsyncWrite
    let port = Async::new(port).unwrap();

    // We can then use FromStdIo to convert from futures's AsyncRead+AsyncWrite
    // to embedded_io's async Read+Write.
    //
    // This is not really needed, you could write the code below using futures::io directly.
    // It's useful if you want to have portable code across embedded and std.
    let mut port = embedded_io::adapters::FromFutures::new(port);

    info!("Serial opened!");

    loop {
        let mut buf = [0u8; 256];
        let n = port.read(&mut buf).await.unwrap();
        info!("read {:?}", &buf[..n]);
    }
}

と思ったら embassy 用のアダプター層があるせいでちょっと大変ですね。 SerialPortAsync<T> でラップされていて (この時点では futures-io に対応)、さらに embedded-io のアダプターが適用されています、と。

embedded-io のアダプターは FromFutures<T> で、read() を呼ぶと、poll_fn() の中で、アダプターの中身の poll_read() を呼びます、と。

/// Adapter from `futures::io` traits.
pub struct FromFutures<T: ?Sized> {
    inner: T,
}

impl<T: futures::io::AsyncRead + Unpin + ?Sized> crate::asynch::Read for FromFutures<T> {
    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
        poll_fn(|cx| Pin::new(&mut self.inner).poll_read(cx, buf)).await
    }
}

poll_read() はどうなっているかと言うと、さらに中身から read() して、io::ErrorKind::WouldBlock かそれ以外かで戻り値が違ってきます。 データが読み込めたら Poll::Ready でそのデータが返るのと、WouldBlock 以外のエラーでも Poll::Ready でエラーが返りそうですね。 ready! マクロが何かというと?

impl<T: Read> AsyncRead for Async<T> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        loop {
            match (*self).get_mut().read(buf) {
                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
                res => return Poll::Ready(res),
            }
            ready!(self.poll_readable(cx))?;
        }
    }

poll_readable() が返す Poll を unwrap するか Pending を返すようになっています。 1つ上の match が Poll::Ready か io エラーしか返さないので、ここで Pending を返しているだけ、という感じですかね。

/// Unwraps `Poll<T>` or returns [`Pending`][`core::task::Poll::Pending`].
#[macro_export]
macro_rules! ready {
    ($e:expr $(,)?) => {
        match $e {
            core::task::Poll::Ready(t) => t,
            core::task::Poll::Pending => return core::task::Poll::Pending,
        }
    };
}

Async<T>::poll_readable() 見ると io エラー以外は意味のある値が返らなさそうなので、ready! マクロの core::task::Poll::Ready(t) => t は io エラー、それ以外だと Poll::Pending が返りそうです。

impl<T> Async<T> {
// ...
    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        self.source.poll_readable(cx)
    }

poll_readable() をもう少し下っていくと async_io の Source::poll_readable() -> poll_ready() になります、と。 で、ここで Waker を登録するようです。

impl Source {
    /// Polls the I/O source for readability.
    pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        self.poll_ready(READ, cx)
    }
// ...
    /// Registers a waker from `poll_readable()` or `poll_writable()`.
    ///
    /// If a different waker is already registered, it gets replaced and woken.
    fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        let mut state = self.state.lock().unwrap();

        // Check if the reactor has delivered an event.
        if let Some((a, b)) = state[dir].ticks {
            // If `state[dir].tick` has changed to a value other than the old reactor tick,
            // that means a newer reactor tick has delivered an event.
            if state[dir].tick != a && state[dir].tick != b {
                state[dir].ticks = None;
                return Poll::Ready(Ok(()));
            }
        }

        let was_empty = state[dir].is_empty();

        // Register the current task's waker.
        if let Some(w) = state[dir].waker.take() {
            if w.will_wake(cx.waker()) {
                state[dir].waker = Some(w);
                return Poll::Pending;
            }
            // Wake the previous waker because it's going to get replaced.
            panic::catch_unwind(|| w.wake()).ok();
        }
        state[dir].waker = Some(cx.waker().clone());
        state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));

        // Update interest in this I/O handle.
        if was_empty {
            Reactor::get().poller.modify(
                self.raw,
                Event {
                    key: self.key,
                    readable: !state[READ].is_empty(),
                    writable: !state[WRITE].is_empty(),
                },
            )?;
        }

        Poll::Pending
    }

Waker を clone するか (state[dir].waker = Some(cx.waker().clone());) すでに Waker が登録されていたら即 Pending を返しているみたいです。 あとは、state[dir].wakerwake() を呼び出すところを探せば良さそうです。

async-io の ReactorLock::react の中でやっていそうですね。

impl ReactorLock<'_> {
    /// Processes new events, blocking until the first event or the timeout.
    pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
        let mut wakers = Vec::new();
// ...
        // Block on I/O events.
        let res = match self.reactor.poller.wait(&mut self.events, timeout) {
            // No I/O events occurred.
            Ok(0) => {
                if timeout != Some(Duration::from_secs(0)) {
                    // The non-zero timeout was hit so fire ready timers.
                    self.reactor.process_timers(&mut wakers);
                }
                Ok(())
            }

            // At least one I/O event occurred.
            Ok(_) => {
                // Iterate over sources in the event list.
                let sources = self.reactor.sources.lock().unwrap();

                for ev in self.events.iter() {
                    // Check if there is a source in the table with this key.
                    if let Some(source) = sources.get(ev.key) {
                        let mut state = source.state.lock().unwrap();

                        // Collect wakers if a writability event was emitted.
                        for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
                            if emitted {
                                state[dir].tick = tick;
                                state[dir].drain_into(&mut wakers);
                            }
                        }
// ...
                    }
                }

                Ok(())
            }
// ...
        };

        // Wake up ready tasks.
        log::trace!("react: {} ready wakers", wakers.len());
        for waker in wakers {
            // Don't let a panicking waker blow everything up.
            panic::catch_unwind(|| waker.wake()).ok();
        }

        res
    }
}

self.reactor.poller.wait(&mut self.events, timeout) の中を見ていくと epoll_wait 呼んでいたので、大体予想通りですね。

これが、マイコンターゲットだと割り込み待ちで、Waker::wake() を呼ぶのでしょう。 nRF あたりの割り込み待ち実装を次回眺めてから、Waker::wake() の中身を追いかければ、大体コアの部分は理解できそうですね。