wasmtime_wasi_http/p3/host/
handler.rs

1use crate::p3::bindings::http::handler::{Host, HostWithStore};
2use crate::p3::bindings::http::types::{ErrorCode, Request, Response};
3use crate::p3::body::{Body, BodyExt as _};
4use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView};
5use anyhow::Context as _;
6use core::task::{Context, Poll, Waker};
7use http_body_util::BodyExt as _;
8use std::sync::Arc;
9use tokio::sync::oneshot;
10use tokio::task::{self, JoinHandle};
11use tracing::debug;
12use wasmtime::component::{Accessor, Resource};
13
14/// A wrapper around [`JoinHandle`], which will [`JoinHandle::abort`] the task
15/// when dropped
16struct AbortOnDropJoinHandle(JoinHandle<()>);
17
18impl Drop for AbortOnDropJoinHandle {
19    fn drop(&mut self) {
20        self.0.abort();
21    }
22}
23
24async fn io_task_result(
25    rx: oneshot::Receiver<(
26        Arc<AbortOnDropJoinHandle>,
27        oneshot::Receiver<Result<(), ErrorCode>>,
28    )>,
29) -> Result<(), ErrorCode> {
30    let Ok((_io, io_result_rx)) = rx.await else {
31        return Ok(());
32    };
33    io_result_rx.await.unwrap_or(Ok(()))
34}
35
36impl HostWithStore for WasiHttp {
37    async fn handle<T>(
38        store: &Accessor<T, Self>,
39        req: Resource<Request>,
40    ) -> HttpResult<Resource<Response>> {
41        // A handle to the I/O task, if spawned, will be sent on this channel
42        // and kept as part of request body state
43        let (io_task_tx, io_task_rx) = oneshot::channel();
44
45        // A handle to the I/O task, if spawned, will be sent on this channel
46        // along with the result receiver
47        let (io_result_tx, io_result_rx) = oneshot::channel();
48
49        // Response processing result will be sent on this channel
50        let (res_result_tx, res_result_rx) = oneshot::channel();
51
52        let getter = store.getter();
53        let fut = store.with(|mut store| {
54            let WasiHttpCtxView { table, .. } = store.get();
55            let req = table
56                .delete(req)
57                .context("failed to delete request from table")
58                .map_err(HttpError::trap)?;
59            let (req, options) =
60                req.into_http_with_getter(&mut store, io_task_result(io_result_rx), getter)?;
61            HttpResult::Ok(store.get().ctx.send_request(
62                req.map(|body| body.with_state(io_task_rx).boxed_unsync()),
63                options.as_deref().copied(),
64                Box::new(async {
65                    // Forward the response processing result to `WasiHttpCtx` implementation
66                    let Ok(fut) = res_result_rx.await else {
67                        return Ok(());
68                    };
69                    Box::into_pin(fut).await
70                }),
71            ))
72        })?;
73        let (res, io) = Box::into_pin(fut).await?;
74        let (
75            http::response::Parts {
76                status, headers, ..
77            },
78            body,
79        ) = res.into_parts();
80
81        let mut io = Box::into_pin(io);
82        let body = match io.as_mut().poll(&mut Context::from_waker(Waker::noop()))? {
83            Poll::Ready(()) => body,
84            Poll::Pending => {
85                // I/O driver still needs to be polled, spawn a task and send handles to it
86                let (tx, rx) = oneshot::channel();
87                let io = task::spawn(async move {
88                    let res = io.await;
89                    debug!(?res, "`send_request` I/O future finished");
90                    _ = tx.send(res);
91                });
92                let io = Arc::new(AbortOnDropJoinHandle(io));
93                _ = io_result_tx.send((Arc::clone(&io), rx));
94                _ = io_task_tx.send(Arc::clone(&io));
95                body.with_state(io).boxed_unsync()
96            }
97        };
98        let res = Response {
99            status,
100            headers: Arc::new(headers),
101            body: Body::Host {
102                body,
103                result_tx: res_result_tx,
104            },
105        };
106        store.with(|mut store| {
107            store
108                .get()
109                .table
110                .push(res)
111                .context("failed to push response to table")
112                .map_err(HttpError::trap)
113        })
114    }
115}
116
117impl Host for WasiHttpCtxView<'_> {}