Rust の組込み用非同期フレームワーク Embassy (6)
誰得 Embassy コードリーディングシリーズです。 Spawner 周りを見終わったので、次は Waker いってみましょう。
Waker はおそらく wake 呼び出す側から見た方がわかりやすいでしょう。 std で動く example の中だと serial.rs でしょうか。
use async_io::Async; use embassy_executor::Executor; use embedded_io::asynch::Read; use log::*; use nix::sys::termios; use static_cell::StaticCell; use self::serial_port::SerialPort; #[embassy_executor::task] async fn run() { // Open the serial port. let baudrate = termios::BaudRate::B115200; let port = SerialPort::new("/dev/ttyACM0", baudrate).unwrap(); //let port = Spy::new(port); // Use async_io's reactor for async IO. // This demonstrates how embassy's executor can drive futures from another IO library. // Essentially, async_io::Async converts from AsRawFd+Read+Write to futures's AsyncRead+AsyncWrite let port = Async::new(port).unwrap(); // We can then use FromStdIo to convert from futures's AsyncRead+AsyncWrite // to embedded_io's async Read+Write. // // This is not really needed, you could write the code below using futures::io directly. // It's useful if you want to have portable code across embedded and std. let mut port = embedded_io::adapters::FromFutures::new(port); info!("Serial opened!"); loop { let mut buf = [0u8; 256]; let n = port.read(&mut buf).await.unwrap(); info!("read {:?}", &buf[..n]); } }
と思ったら embassy 用のアダプター層があるせいでちょっと大変ですね。
SerialPort
が Async<T>
でラップされていて (この時点では futures-io
に対応)、さらに embedded-io のアダプターが適用されています、と。
embedded-io のアダプターは FromFutures<T>
で、read()
を呼ぶと、poll_fn()
の中で、アダプターの中身の poll_read()
を呼びます、と。
/// Adapter from `futures::io` traits. pub struct FromFutures<T: ?Sized> { inner: T, } impl<T: futures::io::AsyncRead + Unpin + ?Sized> crate::asynch::Read for FromFutures<T> { async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { poll_fn(|cx| Pin::new(&mut self.inner).poll_read(cx, buf)).await } }
poll_read()
はどうなっているかと言うと、さらに中身から read()
して、io::ErrorKind::WouldBlock
かそれ以外かで戻り値が違ってきます。
データが読み込めたら Poll::Ready
でそのデータが返るのと、WouldBlock 以外のエラーでも Poll::Ready
でエラーが返りそうですね。
ready!
マクロが何かというと?
impl<T: Read> AsyncRead for Async<T> { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { loop { match (*self).get_mut().read(buf) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} res => return Poll::Ready(res), } ready!(self.poll_readable(cx))?; } }
poll_readable()
が返す Poll
を unwrap するか Pending を返すようになっています。
1つ上の match が Poll::Ready
か io エラーしか返さないので、ここで Pending を返しているだけ、という感じですかね。
/// Unwraps `Poll<T>` or returns [`Pending`][`core::task::Poll::Pending`]. #[macro_export] macro_rules! ready { ($e:expr $(,)?) => { match $e { core::task::Poll::Ready(t) => t, core::task::Poll::Pending => return core::task::Poll::Pending, } }; }
Async<T>::poll_readable()
見ると io エラー以外は意味のある値が返らなさそうなので、ready!
マクロの core::task::Poll::Ready(t) => t
は io エラー、それ以外だと Poll::Pending
が返りそうです。
impl<T> Async<T> { // ... pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { self.source.poll_readable(cx) }
poll_readable()
をもう少し下っていくと async_io の Source::poll_readable()
-> poll_ready()
になります、と。
で、ここで Waker を登録するようです。
impl Source { /// Polls the I/O source for readability. pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { self.poll_ready(READ, cx) } // ... /// Registers a waker from `poll_readable()` or `poll_writable()`. /// /// If a different waker is already registered, it gets replaced and woken. fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> { let mut state = self.state.lock().unwrap(); // Check if the reactor has delivered an event. if let Some((a, b)) = state[dir].ticks { // If `state[dir].tick` has changed to a value other than the old reactor tick, // that means a newer reactor tick has delivered an event. if state[dir].tick != a && state[dir].tick != b { state[dir].ticks = None; return Poll::Ready(Ok(())); } } let was_empty = state[dir].is_empty(); // Register the current task's waker. if let Some(w) = state[dir].waker.take() { if w.will_wake(cx.waker()) { state[dir].waker = Some(w); return Poll::Pending; } // Wake the previous waker because it's going to get replaced. panic::catch_unwind(|| w.wake()).ok(); } state[dir].waker = Some(cx.waker().clone()); state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick)); // Update interest in this I/O handle. if was_empty { Reactor::get().poller.modify( self.raw, Event { key: self.key, readable: !state[READ].is_empty(), writable: !state[WRITE].is_empty(), }, )?; } Poll::Pending }
Waker を clone するか (state[dir].waker = Some(cx.waker().clone());
) すでに Waker が登録されていたら即 Pending を返しているみたいです。
あとは、state[dir].waker
の wake()
を呼び出すところを探せば良さそうです。
async-io の ReactorLock::react
の中でやっていそうですね。
impl ReactorLock<'_> { /// Processes new events, blocking until the first event or the timeout. pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> { let mut wakers = Vec::new(); // ... // Block on I/O events. let res = match self.reactor.poller.wait(&mut self.events, timeout) { // No I/O events occurred. Ok(0) => { if timeout != Some(Duration::from_secs(0)) { // The non-zero timeout was hit so fire ready timers. self.reactor.process_timers(&mut wakers); } Ok(()) } // At least one I/O event occurred. Ok(_) => { // Iterate over sources in the event list. let sources = self.reactor.sources.lock().unwrap(); for ev in self.events.iter() { // Check if there is a source in the table with this key. if let Some(source) = sources.get(ev.key) { let mut state = source.state.lock().unwrap(); // Collect wakers if a writability event was emitted. for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] { if emitted { state[dir].tick = tick; state[dir].drain_into(&mut wakers); } } // ... } } Ok(()) } // ... }; // Wake up ready tasks. log::trace!("react: {} ready wakers", wakers.len()); for waker in wakers { // Don't let a panicking waker blow everything up. panic::catch_unwind(|| waker.wake()).ok(); } res } }
self.reactor.poller.wait(&mut self.events, timeout)
の中を見ていくと epoll_wait
呼んでいたので、大体予想通りですね。
これが、マイコンターゲットだと割り込み待ちで、Waker::wake()
を呼ぶのでしょう。
nRF あたりの割り込み待ち実装を次回眺めてから、Waker::wake()
の中身を追いかければ、大体コアの部分は理解できそうですね。