wasmtime_wasi_http/p3/host/
handler.rs1use crate::p3::bindings::http::handler::{Host, HostWithStore};
2use crate::p3::bindings::http::types::{ErrorCode, Request, Response};
3use crate::p3::body::{Body, BodyExt as _};
4use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView};
5use anyhow::Context as _;
6use core::task::{Context, Poll, Waker};
7use http_body_util::BodyExt as _;
8use std::sync::Arc;
9use tokio::sync::oneshot;
10use tokio::task::{self, JoinHandle};
11use tracing::debug;
12use wasmtime::component::{Accessor, Resource};
13
14struct AbortOnDropJoinHandle(JoinHandle<()>);
17
18impl Drop for AbortOnDropJoinHandle {
19 fn drop(&mut self) {
20 self.0.abort();
21 }
22}
23
24async fn io_task_result(
25 rx: oneshot::Receiver<(
26 Arc<AbortOnDropJoinHandle>,
27 oneshot::Receiver<Result<(), ErrorCode>>,
28 )>,
29) -> Result<(), ErrorCode> {
30 let Ok((_io, io_result_rx)) = rx.await else {
31 return Ok(());
32 };
33 io_result_rx.await.unwrap_or(Ok(()))
34}
35
36impl HostWithStore for WasiHttp {
37 async fn handle<T>(
38 store: &Accessor<T, Self>,
39 req: Resource<Request>,
40 ) -> HttpResult<Resource<Response>> {
41 let (io_task_tx, io_task_rx) = oneshot::channel();
44
45 let (io_result_tx, io_result_rx) = oneshot::channel();
48
49 let (res_result_tx, res_result_rx) = oneshot::channel();
51
52 let getter = store.getter();
53 let fut = store.with(|mut store| {
54 let WasiHttpCtxView { table, .. } = store.get();
55 let req = table
56 .delete(req)
57 .context("failed to delete request from table")
58 .map_err(HttpError::trap)?;
59 let (req, options) =
60 req.into_http_with_getter(&mut store, io_task_result(io_result_rx), getter)?;
61 HttpResult::Ok(store.get().ctx.send_request(
62 req.map(|body| body.with_state(io_task_rx).boxed_unsync()),
63 options.as_deref().copied(),
64 Box::new(async {
65 let Ok(fut) = res_result_rx.await else {
67 return Ok(());
68 };
69 Box::into_pin(fut).await
70 }),
71 ))
72 })?;
73 let (res, io) = Box::into_pin(fut).await?;
74 let (
75 http::response::Parts {
76 status, headers, ..
77 },
78 body,
79 ) = res.into_parts();
80
81 let mut io = Box::into_pin(io);
82 let body = match io.as_mut().poll(&mut Context::from_waker(Waker::noop()))? {
83 Poll::Ready(()) => body,
84 Poll::Pending => {
85 let (tx, rx) = oneshot::channel();
87 let io = task::spawn(async move {
88 let res = io.await;
89 debug!(?res, "`send_request` I/O future finished");
90 _ = tx.send(res);
91 });
92 let io = Arc::new(AbortOnDropJoinHandle(io));
93 _ = io_result_tx.send((Arc::clone(&io), rx));
94 _ = io_task_tx.send(Arc::clone(&io));
95 body.with_state(io).boxed_unsync()
96 }
97 };
98 let res = Response {
99 status,
100 headers: Arc::new(headers),
101 body: Body::Host {
102 body,
103 result_tx: res_result_tx,
104 },
105 };
106 store.with(|mut store| {
107 store
108 .get()
109 .table
110 .push(res)
111 .context("failed to push response to table")
112 .map_err(HttpError::trap)
113 })
114 }
115}
116
117impl Host for WasiHttpCtxView<'_> {}