Rust の組込み用非同期フレームワーク Embassy (5)
ゆるふわ Embassy コードリーディングシリーズです。 前回 spawn 周りはあとタスクの構造だけ、というところまで来たのでタスクの構造を見てみましょう。
タスク関連の構造体としては
- TaskHeader
- TaskStorage
- TaskRef
があります。このへん で定義されています。
TaskHeader は次のように定義されています。 日本語コメントは私の注釈です。
/// Raw task header for use in task pointers. pub(crate) struct TaskHeader { // spawn されたかどうかの状態 pub(crate) state: AtomicU32, // RunQueue に入っている次の TaskHeader への参照 pub(crate) run_queue_item: RunQueueItem, // タスクを実行する Executor への参照 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, // poll 時に呼び出す関数ポインタ。実体は `TaskStorage::poll` poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, }
TaskStrorage は TaskHeader + Future です。
TaskStorage::poll()
が Future::poll()
を呼び出しています。
#[repr(C)] pub struct TaskStorage<F: Future + 'static> { raw: TaskHeader, future: UninitCell<F>, // Valid if STATE_SPAWNED } impl<F: Future + 'static> TaskStorage<F> { const NEW: Self = Self::new(); // ... unsafe fn poll(p: TaskRef) { let this = &*(p.as_ptr() as *const TaskStorage<F>); let future = Pin::new_unchecked(this.future.as_mut()); let waker = waker::from_task(p); let mut cx = Context::from_waker(&waker); match future.poll(&mut cx) { Poll::Ready(_) => { this.future.drop_in_place(); this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); #[cfg(feature = "integrated-timers")] this.raw.expires_at.set(Instant::MAX); } Poll::Pending => {} } // the compiler is emitting a virtual call for waker drop, but we know // it's a noop for our waker. mem::forget(waker); }
poll()
のパスを辿ると、こんな感じになりそうです。
Executor(SyncExecutor)::poll()
TaskHeader::poll_fn()
(実体はTaskStorage::poll()
Future::poll()
TaskRef
は TaskHeader
の参照を持っていますが、コメントにある通り、実質は TaskStorage
への参照です (ただし Future
は型は消えています) 。
/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. #[derive(Clone, Copy)] pub struct TaskRef { ptr: NonNull<TaskHeader>, } impl TaskRef { fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { Self { ptr: NonNull::from(task).cast(), } } /// Safety: The pointer must have been obtained with `Task::as_ptr` pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self { Self { ptr: NonNull::new_unchecked(ptr as *mut TaskHeader), } } pub(crate) fn header(self) -> &'static TaskHeader { unsafe { self.ptr.as_ref() } } /// The returned pointer is valid for the entire TaskStorage. pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() } }
一時、TaskStorage::poll()
で Poll::Ready
のときの値捨てちゃってるけど、async 関数の戻り値ある場合はどうするのでしょう?
と思ったりしましたが、これはあくまでも、embassy_executor の Task を実行しているだけで、embassy_executor の Task は戻り値を持たないのでしょう、という理解に到達しました。
examples/std/src/bin/serial.rs
では Task の中で、async 関数を呼んで、シリアルから読み取ったデータを戻り値として受け取っています。
#[embassy_executor::task] async fn run() { // Open the serial port. let baudrate = termios::BaudRate::B115200; let port = SerialPort::new("/dev/ttyACM0", baudrate).unwrap(); //let port = Spy::new(port); // Use async_io's reactor for async IO. // This demonstrates how embassy's executor can drive futures from another IO library. // Essentially, async_io::Async converts from AsRawFd+Read+Write to futures's AsyncRead+AsyncWrite let port = Async::new(port).unwrap(); // We can then use FromStdIo to convert from futures's AsyncRead+AsyncWrite // to embedded_io's async Read+Write. // // This is not really needed, you could write the code below using futures::io directly. // It's useful if you want to have portable code across embedded and std. let mut port = embedded_io::adapters::FromFutures::new(port); info!("Serial opened!"); loop { let mut buf = [0u8; 256]; let n = port.read(&mut buf).await.unwrap(); info!("read {:?}", &buf[..n]); } }
Future についてはこちらのブログがとてもよくまとまっていて、助かりました。
次は Waker 見ていきましょうね。