wasmtime_wasi_http/p3/host/
handler.rs1use crate::FieldMap;
2use crate::p3::bindings::http::client::{Host, HostWithStore};
3use crate::p3::bindings::http::types::{ErrorCode, Request, Response};
4use crate::p3::body::{Body, BodyExt as _};
5use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView};
6use core::task::{Context, Poll, Waker};
7use http_body_util::BodyExt as _;
8use std::sync::Arc;
9use tokio::sync::oneshot;
10use tokio::task::{self, JoinHandle};
11use tracing::debug;
12use wasmtime::component::{Accessor, Resource};
13use wasmtime::error::Context as _;
14
15struct AbortOnDropJoinHandle(JoinHandle<()>);
18
19impl Drop for AbortOnDropJoinHandle {
20 fn drop(&mut self) {
21 self.0.abort();
22 }
23}
24
25async fn io_task_result(
26 rx: oneshot::Receiver<(
27 Arc<AbortOnDropJoinHandle>,
28 oneshot::Receiver<Result<(), ErrorCode>>,
29 )>,
30) -> Result<(), ErrorCode> {
31 let Ok((_io, io_result_rx)) = rx.await else {
32 return Ok(());
33 };
34 io_result_rx.await.unwrap_or(Ok(()))
35}
36
37impl HostWithStore for WasiHttp {
38 async fn send<T>(
39 store: &Accessor<T, Self>,
40 req: Resource<Request>,
41 ) -> HttpResult<Resource<Response>> {
42 let (io_task_tx, io_task_rx) = oneshot::channel();
45
46 let (io_result_tx, io_result_rx) = oneshot::channel();
49
50 let (res_result_tx, res_result_rx) = oneshot::channel();
52
53 let getter = store.getter();
54 let fut = store.with(|mut store| {
55 let WasiHttpCtxView { table, .. } = store.get();
56 let req = table
57 .delete(req)
58 .context("failed to delete request from table")
59 .map_err(HttpError::trap)?;
60 let (req, options) =
61 req.into_http_with_getter(&mut store, io_task_result(io_result_rx), getter)?;
62 HttpResult::Ok(store.get().hooks.send_request(
63 req.map(|body| body.with_state(io_task_rx).boxed_unsync()),
64 options.as_deref().copied(),
65 Box::new(async {
66 let Ok(fut) = res_result_rx.await else {
68 return Ok(());
69 };
70 Box::into_pin(fut).await
71 }),
72 ))
73 })?;
74 let (res, io) = Box::into_pin(fut).await?;
75 let (
76 http::response::Parts {
77 status, headers, ..
78 },
79 body,
80 ) = res.into_parts();
81
82 let mut io = Box::into_pin(io);
83 let body = match io.as_mut().poll(&mut Context::from_waker(Waker::noop()))? {
84 Poll::Ready(()) => body,
85 Poll::Pending => {
86 let (tx, rx) = oneshot::channel();
88 let io = task::spawn(async move {
89 let res = io.await;
90 debug!(?res, "`send_request` I/O future finished");
91 _ = tx.send(res);
92 });
93 let io = Arc::new(AbortOnDropJoinHandle(io));
94 _ = io_result_tx.send((Arc::clone(&io), rx));
95 _ = io_task_tx.send(Arc::clone(&io));
96 body.with_state(io).boxed_unsync()
97 }
98 };
99 let res = Response {
100 status,
101 headers: FieldMap::new_immutable(headers),
102 body: Body::Host {
103 body,
104 result_tx: res_result_tx,
105 },
106 };
107 store.with(|mut store| {
108 store
109 .get()
110 .table
111 .push(res)
112 .context("failed to push response to table")
113 .map_err(HttpError::trap)
114 })
115 }
116}
117
118impl Host for WasiHttpCtxView<'_> {}