wasmtime_wasi_http/p3/
response.rs1use crate::get_content_length;
2use crate::p3::bindings::http::types::ErrorCode;
3use crate::p3::body::{Body, GuestBody};
4use crate::p3::{WasiHttpCtxView, WasiHttpView};
5use anyhow::Context as _;
6use bytes::Bytes;
7use http::{HeaderMap, StatusCode};
8use http_body_util::BodyExt as _;
9use http_body_util::combinators::UnsyncBoxBody;
10use std::sync::Arc;
11use wasmtime::AsContextMut;
12
13pub struct Response {
15 pub status: StatusCode,
17 pub headers: Arc<HeaderMap>,
19 pub(crate) body: Body,
21}
22
23impl TryFrom<Response> for http::Response<Body> {
24 type Error = http::Error;
25
26 fn try_from(
27 Response {
28 status,
29 headers,
30 body,
31 }: Response,
32 ) -> Result<Self, Self::Error> {
33 let mut res = http::Response::builder().status(status);
34 *res.headers_mut().unwrap() = Arc::unwrap_or_clone(headers);
35 res.body(body)
36 }
37}
38
39impl Response {
40 pub fn into_http<T: WasiHttpView + 'static>(
47 self,
48 store: impl AsContextMut<Data = T>,
49 fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
50 ) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {
51 self.into_http_with_getter(store, fut, T::http)
52 }
53
54 pub fn into_http_with_getter<T: 'static>(
57 self,
58 store: impl AsContextMut<Data = T>,
59 fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
60 getter: fn(&mut T) -> WasiHttpCtxView<'_>,
61 ) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {
62 let res = http::Response::try_from(self)?;
63 let (res, body) = res.into_parts();
64 let body = match body {
65 Body::Guest {
66 contents_rx,
67 trailers_rx,
68 result_tx,
69 } => {
70 let content_length =
72 get_content_length(&res.headers).context("failed to parse `content-length`")?;
73 GuestBody::new(
74 store,
75 contents_rx,
76 trailers_rx,
77 result_tx,
78 fut,
79 content_length,
80 ErrorCode::HttpResponseBodySize,
81 getter,
82 )
83 .boxed_unsync()
84 }
85 Body::Host { body, result_tx } => {
86 _ = result_tx.send(Box::new(fut));
87 body
88 }
89 };
90 Ok(http::Response::from_parts(res, body))
91 }
92
93 pub fn from_http<T>(
95 res: http::Response<T>,
96 ) -> (
97 Self,
98 impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
99 )
100 where
101 T: http_body::Body<Data = Bytes> + Send + 'static,
102 T::Error: Into<ErrorCode>,
103 {
104 let (parts, body) = res.into_parts();
105 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
106
107 let wasi_response = Response {
108 status: parts.status,
109 headers: Arc::new(parts.headers),
110 body: Body::Host {
111 body: body.map_err(Into::into).boxed_unsync(),
112 result_tx,
113 },
114 };
115
116 let io_future = async {
117 let Ok(fut) = result_rx.await else {
118 return Ok(());
119 };
120 Box::into_pin(fut).await
121 };
122
123 (wasi_response, io_future)
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use core::future::Future;
131 use core::pin::pin;
132 use core::task::{Context, Poll, Waker};
133 use http_body_util::Full;
134
135 #[tokio::test]
136 async fn test_response_from_http() {
137 let http_response = http::Response::builder()
138 .status(StatusCode::OK)
139 .header("x-custom-header", "value123")
140 .body(Full::new(Bytes::from_static(b"hello wasm")))
141 .unwrap();
142
143 let (wasi_resp, io_future) = Response::from_http(http_response);
144 assert_eq!(wasi_resp.status, StatusCode::OK);
145 assert_eq!(
146 wasi_resp.headers.get("x-custom-header").unwrap(),
147 "value123"
148 );
149 match wasi_resp.body {
150 Body::Host { body, result_tx } => {
151 let collected = body.collect().await;
152 assert!(collected.is_ok(), "Body stream failed unexpectedly");
153 let chunks = collected.unwrap().to_bytes();
154 assert_eq!(chunks, &b"hello wasm"[..]);
155 _ = result_tx.send(Box::new(async { Ok(()) }));
156 }
157 _ => panic!("Response body should be of type Host"),
158 }
159
160 let mut cx = Context::from_waker(Waker::noop());
161 let result = pin!(io_future).poll(&mut cx);
162 assert!(matches!(result, Poll::Ready(Ok(_))));
163 }
164}