Skip to main content

wasmtime_wasi_http/p3/host/
handler.rs

1use crate::FieldMap;
2use crate::p3::bindings::http::client::{Host, HostWithStore};
3use crate::p3::bindings::http::types::{ErrorCode, Request, Response};
4use crate::p3::body::{Body, BodyExt as _};
5use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView};
6use core::task::{Context, Poll, Waker};
7use http_body_util::BodyExt as _;
8use std::sync::Arc;
9use tokio::sync::oneshot;
10use tokio::task::{self, JoinHandle};
11use tracing::debug;
12use wasmtime::component::{Accessor, Resource};
13use wasmtime::error::Context as _;
14
15/// A wrapper around [`JoinHandle`], which will [`JoinHandle::abort`] the task
16/// when dropped
17struct AbortOnDropJoinHandle(JoinHandle<()>);
18
19impl Drop for AbortOnDropJoinHandle {
20    fn drop(&mut self) {
21        self.0.abort();
22    }
23}
24
25async fn io_task_result(
26    rx: oneshot::Receiver<(
27        Arc<AbortOnDropJoinHandle>,
28        oneshot::Receiver<Result<(), ErrorCode>>,
29    )>,
30) -> Result<(), ErrorCode> {
31    let Ok((_io, io_result_rx)) = rx.await else {
32        return Ok(());
33    };
34    io_result_rx.await.unwrap_or(Ok(()))
35}
36
37impl HostWithStore for WasiHttp {
38    async fn send<T>(
39        store: &Accessor<T, Self>,
40        req: Resource<Request>,
41    ) -> HttpResult<Resource<Response>> {
42        // A handle to the I/O task, if spawned, will be sent on this channel
43        // and kept as part of request body state
44        let (io_task_tx, io_task_rx) = oneshot::channel();
45
46        // A handle to the I/O task, if spawned, will be sent on this channel
47        // along with the result receiver
48        let (io_result_tx, io_result_rx) = oneshot::channel();
49
50        // Response processing result will be sent on this channel
51        let (res_result_tx, res_result_rx) = oneshot::channel();
52
53        let getter = store.getter();
54        let fut = store.with(|mut store| {
55            let WasiHttpCtxView { table, .. } = store.get();
56            let req = table
57                .delete(req)
58                .context("failed to delete request from table")
59                .map_err(HttpError::trap)?;
60            let (req, options) =
61                req.into_http_with_getter(&mut store, io_task_result(io_result_rx), getter)?;
62            HttpResult::Ok(store.get().hooks.send_request(
63                req.map(|body| body.with_state(io_task_rx).boxed_unsync()),
64                options.as_deref().copied(),
65                Box::new(async {
66                    // Forward the response processing result to `WasiHttpCtx` implementation
67                    let Ok(fut) = res_result_rx.await else {
68                        return Ok(());
69                    };
70                    Box::into_pin(fut).await
71                }),
72            ))
73        })?;
74        let (res, io) = Box::into_pin(fut).await?;
75        let (
76            http::response::Parts {
77                status, headers, ..
78            },
79            body,
80        ) = res.into_parts();
81
82        let mut io = Box::into_pin(io);
83        let body = match io.as_mut().poll(&mut Context::from_waker(Waker::noop()))? {
84            Poll::Ready(()) => body,
85            Poll::Pending => {
86                // I/O driver still needs to be polled, spawn a task and send handles to it
87                let (tx, rx) = oneshot::channel();
88                let io = task::spawn(async move {
89                    let res = io.await;
90                    debug!(?res, "`send_request` I/O future finished");
91                    _ = tx.send(res);
92                });
93                let io = Arc::new(AbortOnDropJoinHandle(io));
94                _ = io_result_tx.send((Arc::clone(&io), rx));
95                _ = io_task_tx.send(Arc::clone(&io));
96                body.with_state(io).boxed_unsync()
97            }
98        };
99        let res = Response {
100            status,
101            headers: FieldMap::new_immutable(headers),
102            body: Body::Host {
103                body,
104                result_tx: res_result_tx,
105            },
106        };
107        store.with(|mut store| {
108            store
109                .get()
110                .table
111                .push(res)
112                .context("failed to push response to table")
113                .map_err(HttpError::trap)
114        })
115    }
116}
117
118impl Host for WasiHttpCtxView<'_> {}