wasmtime_wasi_http/p3/
response.rs

1use crate::get_content_length;
2use crate::p3::bindings::http::types::ErrorCode;
3use crate::p3::body::{Body, GuestBody};
4use crate::p3::{WasiHttpCtxView, WasiHttpView};
5use anyhow::Context as _;
6use bytes::Bytes;
7use http::{HeaderMap, StatusCode};
8use http_body_util::BodyExt as _;
9use http_body_util::combinators::UnsyncBoxBody;
10use std::sync::Arc;
11use wasmtime::AsContextMut;
12
13/// The concrete type behind a `wasi:http/types.response` resource.
14pub struct Response {
15    /// The status of the response.
16    pub status: StatusCode,
17    /// The headers of the response.
18    pub headers: Arc<HeaderMap>,
19    /// Response body.
20    pub(crate) body: Body,
21}
22
23impl TryFrom<Response> for http::Response<Body> {
24    type Error = http::Error;
25
26    fn try_from(
27        Response {
28            status,
29            headers,
30            body,
31        }: Response,
32    ) -> Result<Self, Self::Error> {
33        let mut res = http::Response::builder().status(status);
34        *res.headers_mut().unwrap() = Arc::unwrap_or_clone(headers);
35        res.body(body)
36    }
37}
38
39impl Response {
40    /// Convert [Response] into [http::Response].
41    ///
42    /// The specified [Future] `fut` can be used to communicate
43    /// a response processing error, if any, to the constructor of the response.
44    /// For example, if the response was constructed via `wasi:http/types.response#new`,
45    /// a result sent on `fut` will be forwarded to the guest on the future handle returned.
46    pub fn into_http<T: WasiHttpView + 'static>(
47        self,
48        store: impl AsContextMut<Data = T>,
49        fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
50    ) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {
51        self.into_http_with_getter(store, fut, T::http)
52    }
53
54    /// Like [`Self::into_http`], but with a custom function for converting `T`
55    /// to a [`WasiHttpCtxView`].
56    pub fn into_http_with_getter<T: 'static>(
57        self,
58        store: impl AsContextMut<Data = T>,
59        fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
60        getter: fn(&mut T) -> WasiHttpCtxView<'_>,
61    ) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {
62        let res = http::Response::try_from(self)?;
63        let (res, body) = res.into_parts();
64        let body = match body {
65            Body::Guest {
66                contents_rx,
67                trailers_rx,
68                result_tx,
69            } => {
70                // `Content-Length` header value is validated in `fields` implementation
71                let content_length =
72                    get_content_length(&res.headers).context("failed to parse `content-length`")?;
73                GuestBody::new(
74                    store,
75                    contents_rx,
76                    trailers_rx,
77                    result_tx,
78                    fut,
79                    content_length,
80                    ErrorCode::HttpResponseBodySize,
81                    getter,
82                )
83                .boxed_unsync()
84            }
85            Body::Host { body, result_tx } => {
86                _ = result_tx.send(Box::new(fut));
87                body
88            }
89        };
90        Ok(http::Response::from_parts(res, body))
91    }
92
93    /// Convert [http::Response] into [Response].
94    pub fn from_http<T>(
95        res: http::Response<T>,
96    ) -> (
97        Self,
98        impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
99    )
100    where
101        T: http_body::Body<Data = Bytes> + Send + 'static,
102        T::Error: Into<ErrorCode>,
103    {
104        let (parts, body) = res.into_parts();
105        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
106
107        let wasi_response = Response {
108            status: parts.status,
109            headers: Arc::new(parts.headers),
110            body: Body::Host {
111                body: body.map_err(Into::into).boxed_unsync(),
112                result_tx,
113            },
114        };
115
116        let io_future = async {
117            let Ok(fut) = result_rx.await else {
118                return Ok(());
119            };
120            Box::into_pin(fut).await
121        };
122
123        (wasi_response, io_future)
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use core::future::Future;
131    use core::pin::pin;
132    use core::task::{Context, Poll, Waker};
133    use http_body_util::Full;
134
135    #[tokio::test]
136    async fn test_response_from_http() {
137        let http_response = http::Response::builder()
138            .status(StatusCode::OK)
139            .header("x-custom-header", "value123")
140            .body(Full::new(Bytes::from_static(b"hello wasm")))
141            .unwrap();
142
143        let (wasi_resp, io_future) = Response::from_http(http_response);
144        assert_eq!(wasi_resp.status, StatusCode::OK);
145        assert_eq!(
146            wasi_resp.headers.get("x-custom-header").unwrap(),
147            "value123"
148        );
149        match wasi_resp.body {
150            Body::Host { body, result_tx } => {
151                let collected = body.collect().await;
152                assert!(collected.is_ok(), "Body stream failed unexpectedly");
153                let chunks = collected.unwrap().to_bytes();
154                assert_eq!(chunks, &b"hello wasm"[..]);
155                _ = result_tx.send(Box::new(async { Ok(()) }));
156            }
157            _ => panic!("Response body should be of type Host"),
158        }
159
160        let mut cx = Context::from_waker(Waker::noop());
161        let result = pin!(io_future).poll(&mut cx);
162        assert!(matches!(result, Poll::Ready(Ok(_))));
163    }
164}