Rustで書かれたVMM firecrackerを読もう!(4)~API Server初期化~

github.com

tomo-wait-for-it-yuki.hatenablog.com

前回は、Vmm::new()を読み、VMMの初期化が大体わかった気になりました。今回はAPI Serverの初期化を見ていきます。 tokioが絡む部分は、非同期処理初心者なので間違っている可能性があります。

ApiServer::new()

src/main.rsのmain関数では、ApiServer::new()を呼んでいました。改めて、このnew()の中身を見てみます。

    // ApiServer::new()の引数を準備しています
    let shared_info = Arc::new(RwLock::new(InstanceInfo {
        state: InstanceState::Uninitialized,
        id: instance_id,
    }));
    let mmds_info = MMDS.clone();
    let (to_vmm, from_api) = channel();

    // ApiServerのオブジェクトを作成
    let server = ApiServer::new(mmds_info, shared_info.clone(), to_vmm).unwrap();

api_server/src/lib.rs

impl ApiServer {
    pub fn new(
        mmds_info: Arc<Mutex<Mmds>>,
        vmm_shared_info: Arc<RwLock<InstanceInfo>>,
        api_request_sender: mpsc::Sender<Box<VmmAction>>,
    ) -> Result<Self> {
        Ok(ApiServer {
            mmds_info,
            vmm_shared_info,
            api_request_sender: Rc::new(api_request_sender),
            efd: Rc::new(EventFd::new().map_err(Error::Eventfd)?),
        })
    }

ApiServer::new()に渡された引数のおさらいです。

  • MMDSは、microVM metadata serviceの略でした。mmds_infoが何者か、は判明していません。
  • vmm_shared_infoは、VMMのthreadと共有しているデータです。
  • api_request_senderは、非同期チャネルのSenderです。Rcで包んでいるので、スレッド内で複数参照されるようです。

ApiServerは、VMM threadにイベントの通知をするためのファイルディスクリプタを持っています。これは、Linux eventfdのラッパーです。

下記のEventFd::new()を見るとわかりますが、libcのeventfdを呼び出しているだけです。

sys_util/src/eventfd.rs

use libc::{c_void, dup, eventfd, read, write};
...
/// A safe wrapper around a Linux eventfd (man 2 eventfd).
///
/// An eventfd is useful because it is sendable across processes and can be used for signaling in
/// and out of the KVM API. They can also be polled like any other file descriptor.
pub struct EventFd {
    eventfd: File,
}

impl EventFd {
    /// Creates a new blocking EventFd with an initial value of 0.
    pub fn new() -> Result<EventFd> {
        // This is safe because eventfd merely allocated an eventfd for our process and we handle
        // the error case.
        let ret = unsafe { eventfd(0, 0) };
        if ret < 0 {
            return errno_result();
        }
        // This is safe because we checked ret for success and know the kernel gave us an fd that we
        // own.
        Ok(EventFd {
            eventfd: unsafe { File::from_raw_fd(ret) },
        })
    }

初期化が終わると、unix socketを受け取って、bind_and_run関数を実行します。

bind_and_run

bind_and_runは、main.rsで次のように呼び出されていました。エラー発生時のみ、mainに帰ってきます。

src/main.rs

    match server.bind_and_run(uds_path_or_fd, start_time_us, start_time_cpu_us) {
        Ok(_) => (),
        Err(Error::Io(inner)) => match inner.kind() {
...
        },
        Err(Error::Eventfd(inner)) => panic!(
...
        ),
    }

bind_and_runでは、まず、tokioのCore::new()を呼んでいます。

api_server/src/lib.rs

impl ApiServer {
...
    // TODO: does tokio_uds also support abstract domain sockets?
    pub fn bind_and_run<P: AsRef<Path>>(
        &self,
        path_or_fd: UnixDomainSocket<P>,
        start_time_us: Option<u64>,
        start_time_cpu_us: Option<u64>,
    ) -> Result<()> {
        let mut core = Core::new().map_err(Error::Io)?;
        let handle = Rc::new(core.handle());

tokioのCoreは、tokio::reactorで実装されているEvent loopです。

少し読んでみようかな、と下記ソースに飛びましたが、諦めてスゴスゴと帰ってきました。

tokio/src/reactor/mod.rs

impl Core {
    /// Creates a new event loop, returning any error that happened during the
    /// creation.
    // わからん!
    pub fn new() -> io::Result<Core> {
        let io = try!(mio::Poll::new());
        let future_pair = mio::Registration::new2();
        try!(io.register(&future_pair.0,
                         TOKEN_FUTURE,
                         mio::Ready::readable(),
                         mio::PollOpt::level()));
        let (tx, rx) = mpsc::unbounded();
        let channel_pair = mio::Registration::new2();
        try!(io.register(&channel_pair.0,
                         TOKEN_MESSAGES,
                         mio::Ready::readable(),
                         mio::PollOpt::level()));
        let rx_readiness = Arc::new(MySetReadiness(channel_pair.1));
        rx_readiness.notify(0);

        Ok(Core {
...

ただ、CoreがEvent loopであることがわかれば、後は、イベント発生時のハンドラを登録して、イベント待ちになるだけ、のはずです。続きを見ると、unix domain socketのlistenerを作成しています。

    pub fn bind_and_run<P: AsRef<Path>>(
...
        // `path_or_fd`はunix domain socketのパスまたはファイルディスクリプタです
        let listener = match path_or_fd {
            UnixDomainSocket::Path(path) => UnixListener::bind(path, &handle).map_err(Error::Io)?,
            UnixDomainSocket::Fd(fd) => {
                // Safe because we assume fd is a valid file descriptor number, associated with a
                // previously bound UnixListener.
                UnixListener::from_listener(
                    unsafe { std::os::unix::net::UnixListener::from_raw_fd(fd) },
                    &handle,
                ).map_err(Error::Io)?
            }
        };

unix domain socketで、API ServerとVMMとの高速なIPC (Inter process communication)を行っている、のだと思います。そのあたりは追々判明するでしょう。 Goならわかるシステムプログラミング Unixドメインソケット

次に、メトリクスに実行開始時間を出力しています。一瞬、目に馴染まない書き方されていますが、METRICSはlazy_staticで初期化されるグローバルオブジェクトで、.api_serverなどはただのpublicなフィールドです。

        if let Some(start_time) = start_time_us {
            let delta_us = (chrono::Utc::now().timestamp_nanos() / 1000) as u64 - start_time;
            METRICS
                .api_server
                .process_startup_time_us
                .add(delta_us as usize);
        }

        if let Some(cpu_start_time) = start_time_cpu_us {
            let delta_us = fc_util::now_cputime_us() - cpu_start_time;
            METRICS
                .api_server
                .process_startup_time_cpu_us
                .add(delta_us as usize);
        }

次は、HTTPサーバの初期化?でしょうか。何回か耳にしたことがあるhyperの文字が見えます。

        let http: Http<hyper::Chunk> = Http::new();

関数の定義は下の通りなので、おそらくあっているでしょう。

hyper/src/server/mod.rs

impl<B: AsRef<[u8]> + 'static> Http<B> {
    /// Creates a new instance of the HTTP protocol, ready to spawn a server or
    /// start accepting connections.
    pub fn new() -> Http<B> {
        Http {
            keep_alive: true,
            max_buf_size: None,
            pipeline: false,
            _marker: PhantomData,
        }
    }

さて、次がよくわからんポイントです。ここで、fFuture traitを実装する型です。

        let f = listener
            .incoming()
            .for_each(|(stream, _)| {
                // For the sake of clarity: when we use self.efd.clone(), the intent is to
                // clone the wrapping Rc, not the EventFd itself.
                let service = ApiServerHttpService::new(
                    self.mmds_info.clone(),
                    self.vmm_shared_info.clone(),
                    self.api_request_sender.clone(),
                    self.efd.clone(),
                );
                let connection = http.serve_connection(stream, service);
                // todo: is spawn() any better/worse than execute()?
                // We have to adjust the future item and error, to fit spawn()'s definition.
                handle.spawn(connection.map(|_| ()).map_err(|_| ()));
                Ok(())
            }).map_err(Error::Io);

listener.incoming()は、IoStream<(UnixStream, SocketAddr)>を返します。incoming()は、listener自身を消費して、IoStream<(UnixStream, SocketAddr)>を作り出すようです。 ソースコードを見てみますが、難しいですね。

tokio-uds-0.1.7/src/lib,rs

    /// Consumes this listener, returning a stream of the sockets this listener
    /// accepts.
    ///
    /// This method returns an implementation of the `Stream` trait which
    /// resolves to the sockets the are accepted on this listener.
    pub fn incoming(self) -> IoStream<(UnixStream, SocketAddr)> {
        struct Incoming {
            inner: UnixListener,
        }

        impl Stream for Incoming {
            type Item = (UnixStream, SocketAddr);
            type Error = io::Error;

            fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
                Ok(Some(try_nb!(self.inner.accept())).into())
            }
        }

        Incoming { inner: self }.boxed()
    }

Stream traitが、SelfからItemへの変換する機能を実装していないと辻褄が合わないですね。 えーっと、Stream traitは、futures::Streamで、Iteratorの非同期バージョン、とのことです。なので、for_each()で処理しているのですね。 IoStreamは、下のように定義されているので、`Incoming { inner: self }.boxed()‘のところで型が合っていますね。

type IoStream<T> = Box<Stream<Item = T, Error = Error> + Send>;

incoming()の謎が解けたところで、for_each()に行くと、UnixStreamstreamとしてクロージャで受け取り、HTTP Serviceを作成して、UnixStreamとbindしている、ということみたいですね。

            .incoming()
            .for_each(|(stream, _)| {
                // For the sake of clarity: when we use self.efd.clone(), the intent is to
                // clone the wrapping Rc, not the EventFd itself.
                let service = ApiServerHttpService::new(
                    self.mmds_info.clone(),
                    self.vmm_shared_info.clone(),
                    self.api_request_sender.clone(),
                    self.efd.clone(),
                );
                let connection = http.serve_connection(stream, service);
                // todo: is spawn() any better/worse than execute()?
                // We have to adjust the future item and error, to fit spawn()'s definition.
                handle.spawn(connection.map(|_| ()).map_err(|_| ()));
                Ok(())
            }).map_err(Error::Io);

ApiServiceHttpServiceは、来たrequestごとに作成されるハンドラ、ということで、unix domain socketにbindされるので、これでVMM threadのほうに通知が行く…のかなぁ?

// In hyper, a struct that implements the Service trait is created to handle each incoming
// request. This is the one for our ApiServer.
pub struct ApiServerHttpService {
    // MMDS info directly accessible from this API thread.
    mmds_info: Arc<Mutex<Mmds>>,
    // VMM instance info directly accessible from this API thread.
    vmm_shared_info: Arc<RwLock<InstanceInfo>>,
    // This allows sending messages to the VMM thread. It makes sense to use a Rc for the sender
    // (instead of cloning) because everything happens on a single thread, so there's no risk of
    // having races (if that was even a problem to begin with).
    api_request_sender: Rc<mpsc::Sender<Box<VmmAction>>>,
    // We write to this EventFd to let the VMM know about new messages.
    vmm_send_event: Rc<EventFd>,
}

ちょっと、わからなくなってしまったので、一旦ここまでにします! hyperとかtokioを少し勉強して出直します。